Spring工程师打开Vertx的正确方式

这两天看了一下Java的异步框架Vert.X, 入门大家去官网看就好了。我在GitHub上也翻了一些例子,都是一些非常简单堆代码的形式,几乎都写在启动类里。这对一个无脑使用 Controller, Service, Dao 分层的人来说简直不能忍。所以就写了一个Demo来以Spring工程师的习惯组织一个Vert.X CRUD的代码。

1 项目文件结构

image.png

  • 因为Vert.X里用router来管理路由,所以把controller包用router代替。这样就很Spring了。
    依赖, 很多包暂时用上,方便后续增加功能
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yeyeck</groupId> <artifactId>vertx</artifactId> <version>1.0.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-shade-plugin.version>2.4.3</maven-shade-plugin.version> <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> <exec-maven-plugin.version>1.5.0</exec-maven-plugin.version> <vertx.version>4.0.0-milestone5</vertx.version> <junit-jupiter.version>5.4.0</junit-jupiter.version> <main.verticle>com.yeyeck.vertx.MainVerticle</main.verticle> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-stack-depchain</artifactId> <version>${vertx.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-auth-shiro</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mysql-client</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mail-client</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-redis-client</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-junit5</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>${junit-jupiter.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>${junit-jupiter.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>${maven-compiler-plugin.version}</version> <configuration> <release>11</release> </configuration> </plugin> <plugin> <artifactId>maven-shade-plugin</artifactId> <version>${maven-shade-plugin.version}</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> <Main-Class>io.vertx.core.Launcher</Main-Class> <Main-Verticle>${main.verticle}</Main-Verticle> </manifestEntries> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource> </transformer> </transformers> <artifactSet> </artifactSet> <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar </outputFile> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>${maven-surefire-plugin.version}</version> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>${exec-maven-plugin.version}</version> <configuration> <mainClass>io.vertx.core.Launcher</mainClass> <arguments> <argument>run</argument> <argument>${main.verticle}</argument> </arguments> </configuration> </plugin> </plugins> </build> <repositories> <repository> <id>sonatype-oss-snapshots</id> <name>Sonatype OSSRH Snapshots</name> <url>https://oss.sonatype.org/content/repositories/snapshots</url> <layout>default</layout> <releases> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </releases> <snapshots> <enabled>true</enabled> <updatePolicy>never</updatePolicy> </snapshots> </repository> </repositories> </project>

2 代码

2.1 主类

import com.yeyeck.vertx.router.ArticleRouter; import com.yeyeck.vertx.router.UserRouter; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.BodyHandler; public class MainVerticle extends AbstractVerticle { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new MainVerticle()); } @Override public void start(Promise<Void> startPromise) throws Exception { // 创建一个 router Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); // 创建一个http server 并将所有请求交给 router 来管理 vertx.createHttpServer().requestHandler(router).listen(8888, http -> { if (http.succeeded()) { startPromise.complete(); System.out.println("HTTP server started on port 8888"); } else { startPromise.fail(http.cause()); } }); // 在router上挂载url new ArticleRouter().init(router); new UserRouter().init(router); } }

2.2 Router 层

2.2.1 BasicRouter ———— 所有Router类的基类

先定义了一个抽象类,声明一个抽象方法, 所有的 router 都要继承这个类, 然后实现init(router)这个方法

import io.vertx.ext.web.Router; public abstract class BasicRouter { public abstract void init(Router router); }

2.2.2 ArticleRouter

BasicRouter 的具体实现,实现了简单的增删改查接口,处理回调的代码很单一又不好统一处理,这是最头疼的地方,整个项目都是这样。还在想办法改进。

import com.yeyeck.vertx.router.fo.ArticleFo; import com.yeyeck.vertx.service.IArticleService; import com.yeyeck.vertx.service.impl.ArticleServiceImpl; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; public class ArticleRouter extends BasicRouter{ private final IArticleService articleService = new ArticleServiceImpl(); @Override public void init(Router router) { router.post("/article").handler(this::post); router.get("/article/:id").handler(this::get); router.put("/article/:id").handler(this::update); router.delete("/article/:id").handler(this::deleteArticle); router.get("/article/transaction/").handler(this::transaction); } private void post(RoutingContext routingContext) { JsonObject jsonObject = routingContext.getBodyAsJson(); ArticleFo articleFo = new ArticleFo(jsonObject); articleService.addArticle(articleFo).onSuccess(res -> { routingContext.response().setStatusCode(200).end(String.valueOf(res)); }).onFailure(throwable -> { routingContext.response().setStatusCode(500).end(throwable.toString()); }); } private void get(RoutingContext routingContext) { Integer id = Integer.parseInt(routingContext.request().getParam("id")); articleService.getById(id).onSuccess(article -> { routingContext.response().setStatusCode(200).end(article.toJson().toString()); }).onFailure(throwable -> { routingContext.response().setStatusCode(500).end(throwable.toString()); }); } private void update(RoutingContext routingContext) { Integer id = Integer.parseInt(routingContext.request().getParam("id")); JsonObject jsonObject = routingContext.getBodyAsJson(); articleService.update(id, new ArticleFo(jsonObject)) .onSuccess(res -> { routingContext.response().setStatusCode(200).end(String.valueOf(res)); }).onFailure(throwable -> { routingContext.response().setStatusCode(500).end(throwable.toString()); });; } private void deleteArticle(RoutingContext routingContext) { Integer id = Integer.parseInt(routingContext.request().getParam("id")); articleService.deleteById(id) .onSuccess(res -> { routingContext.response().setStatusCode(200).end(String.valueOf(res)); }).onFailure(throwable -> { routingContext.response().setStatusCode(500).end(throwable.toString()); });; } private void transaction(RoutingContext routingContext) { articleService.testTransaction().onSuccess(integer -> { routingContext.response().setStatusCode(200).end(String.valueOf(integer)); }).onFailure(throwable -> { routingContext.response().setStatusCode(500).end(throwable.toString()); }); } }

2.3 Service 层

接口声明

import com.yeyeck.vertx.pojo.Article; import com.yeyeck.vertx.router.fo.ArticleFo; import io.vertx.core.Future; public interface IArticleService { Future<Integer> addArticle(ArticleFo articleFo); Future<Article> getById(Integer id); Future<Integer> update(Integer id, ArticleFo articleFo); Future<Integer> deleteById(Integer id); Future<Integer> testTransaction(); }

接口实现,这里只贴两个方法的代码, 一个需要实现事务,一个不需要

@Override public Future<Integer> deleteById(Integer id) { Promise<Integer> promise = Promise.promise(); SqlUtil.pool().getConnection().onSuccess(connection -> { articleDao.deleteById(connection, id) .onSuccess(res -> { // 正确执行sql, 释放connection connection.close(); promise.complete(res); }) .onFailure(throwable -> { // 执行sql发生错误, 释放connection connection.close(); promise.fail(throwable); }); }).onFailure(promise::fail); // 未拿到 connection return promise.future(); } // Transaction Demo @Override public Future<Integer> testTransaction() { Promise<Integer> promise = Promise.promise(); Article article = new Article(); article.setTitle("transaction"); article.setAbstractText("transaction"); article.setContent("transaction"); article.setId(33); SqlUtil.getConnection().onSuccess(connection -> { // 开始一个transaction connection.begin(ar -> { if (ar.succeeded()) { // transaction 开启 Transaction ts = ar.result(); // 调用 dao 的方法执行SQL, 封装 dao 的方法都传入 connection 的就是为了实现事务 articleDao.add(connection, article) .onSuccess(integer -> articleDao.add(connection, article)) .onSuccess(integer -> articleDao.add(connection, article)) .onSuccess(integer -> articleDao.update(connection, article)) .onSuccess(integer -> { // 都执行成功了才走到这里, 提交事务 ts.commit(tsar -> { if (tsar.succeeded()) { promise.complete(1); connection.close(); } else { promise.fail(tsar.cause()); } connection.close(); }); }).onFailure(throwable -> { // 事务提交失败 promise.fail(throwable); connection.close(); }); } else { // transaction 失败,关闭连接 promise.fail(ar.cause()); connection.close(); } }); }); return promise.future(); }

2.4 Dao 层

一个管理SqlConnection的链接池

import io.vertx.core.Vertx; import io.vertx.mysqlclient.MySQLConnectOptions; import io.vertx.mysqlclient.MySQLPool; import io.vertx.sqlclient.PoolOptions; public class SqlUtil { private static final MySQLPool pool; static { MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setHost("127.0.0.1") .setUser("root") .setPassword("password") .setPort(3306) .setDatabase("vertx"); PoolOptions poolOptions = new PoolOptions().setMaxSize(5); pool = MySQLPool.pool(Vertx.vertx(), connectOptions, poolOptions); } private SqlUtil(){} public static MySQLPool pool() { return pool; } public static Future<SqlConnection> getConnection() { Promise<SqlConnection> promise = Promise.promise(); pool.getConnection(ar -> { if (ar.succeeded()) { promise.complete(ar.result()); } else { ar.cause().printStackTrace(); promise.fail(ar.cause()); } }); return promise.future(); } }

准备数据

CREATE DATABASE /*!32312 IF NOT EXISTS*/`vertx` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_bin */; USE `vertx`; /*Table structure for table `t_article` */ DROP TABLE IF EXISTS `t_article`; CREATE TABLE `t_article` ( `id` int(11) NOT NULL AUTO_INCREMENT, `title` varchar(100) COLLATE utf8mb4_bin NOT NULL, `abstract_text` varchar(200) COLLATE utf8mb4_bin NOT NULL, `content` mediumtext COLLATE utf8mb4_bin NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=29 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; /*Data for the table `t_article` */ insert into `t_article`(`id`,`title`,`abstract_text`,`content`) values (1,'title 2','abstract 1','content123 1'),(2,'transaction','transaction','transaction'),(3,'transaction','transaction','transaction');

声明接口

import com.yeyeck.vertx.pojo.Article; import io.vertx.core.Future; import io.vertx.sqlclient.SqlConnection; public interface IArticleDao { Future<Integer> add(SqlConnection connection, Article article); Future<Article> getById(SqlConnection connection, Integer id); Future<Integer> update(SqlConnection connection, Article article); Future<Integer> deleteById(SqlConnection connection, Integer id); }

实现 CRUD

import com.yeyeck.vertx.dao.IArticleDao; import com.yeyeck.vertx.pojo.Article; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.mysqlclient.MySQLClient; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.Tuple; public class ArticleDaoImpl implements IArticleDao { @Override public Future<Integer> add(SqlConnection connection, Article article) { Promise<Integer> promise = Promise.promise(); String sql = "insert into t_article(title, abstract_text, content) values (?, ?, ?)"; Tuple params = Tuple.of(article.getTitle(), article.getAbstractText(), article.getContent()); connection.preparedQuery(sql) .execute(params, ar ->{ if(ar.succeeded()) { RowSet<Row> rows = ar.result(); Long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID); promise.complete(lastInsertId.intValue()); } else { promise.fail(ar.cause()); } }); return promise.future(); } @Override public Future<Article> getById(SqlConnection connection, Integer id) { Promise<Article> promise = Promise.promise(); String sql = "select id, title, abstract_text, content from t_article where id = ?"; Tuple params = Tuple.of(id); connection.preparedQuery(sql) .execute(params, ar ->{ if(ar.succeeded()) { RowSet<Row> rows = ar.result(); Row row = rows.iterator().next(); Article article = new Article(row); promise.complete(article); } else { promise.fail(ar.cause()); } }); return promise.future(); } @Override public Future<Integer> update(SqlConnection connection, Article article) { Promise<Integer> promise = Promise.promise(); String sql = "update t_article set title = ?, abstract_text = ?, content = ? where id = ?"; Tuple params = Tuple.of(article.getTitle(), article.getAbstractText(), article.getContent(), article.getId()); connection.preparedQuery(sql) .execute(params, ar ->{ if(ar.succeeded()) { promise.complete(1); } else { promise.fail(ar.cause()); } }); return promise.future(); } @Override public Future<Integer> deleteById(SqlConnection connection, Integer id) { Promise<Integer> promise = Promise.promise(); String sql = "delete from t_article where id = ?"; Tuple params = Tuple.of(id); connection.preparedQuery(sql) .execute(params, ar ->{ if(ar.succeeded()) { promise.complete(1); } else { promise.fail(ar.cause()); } }); return promise.future(); } }

2.5 Pojo

特殊的地方就是手动实现 数据库返回结果Row和JsonObject的转换了。Vert.X似乎希望我们自己去做这些事情,丝毫没有考虑利用反射。

import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.Row; import lombok.Data; @Data public class Article { private Integer id; private String title; private String abstractText; private String content; public Article(){} public Article(Row row) { this.id = row.getInteger("id"); this.title = row.getString("title"); this.content = row.getString("content"); this.abstractText = row.getString("abstract_text"); } public JsonObject toJson() { return new JsonObject().put("id", this.id) .put("title", this.title) .put("abstractText", this.abstractText) .put("content", content); } }

3 测试代码

首先运行主类 MainVerticle
image.png
目前数据库里的数据
image.png
然后使用postman进行测试
Get localhost:8888/article/1
image.png
Post localhost:8888/article
image.png
测试事务
Get localhost:8888/article/transaction/
image.png
把 update方法的sql语句改成错的, 比如,再重启测试transaction接口

String sql = "update xxxt_article set title = ?, abstract_text = ?, content = ? where id = ?";

image.png
数据库也没有新增成功.证明事务没有commit

4 总结

由于Vert.X是异步编程,所以利用Future和Promise来处理回调是关键。

阅读(49)
评论(1)
updated@2020-11-02
评论区
6
666
2020-11-12
666
目录