KeyCloak持久化机制-简述

在研究KeyCloak的部分接口功能的时候,发现有相当一部分的接口代码没有直接做持久化,但是在接口调用完成后,确实数据又被持久化到了数据库之中,因此花了一些时间来研究了一下他是怎么做持久化的。

业务逻辑接口-示例

RoleByIdResource 接口

@Path("{role-id}/composites")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Tag(name = KeycloakOpenAPI.Admin.Tags.ROLES_BY_ID)
@Operation(summary = "Make the role a composite role by associating some child roles")
@APIResponses(value = {
    @APIResponse(responseCode = "204", description = "No Content"),
    @APIResponse(responseCode = "403", description = "Forbidden")
})
public void addComposites(final @PathParam("role-id") String id, List<RoleRepresentation> roles) {
    RoleModel role = getRoleModel(id);
    // 验证当前角色是否有权限管理该角色
    auth.roles().requireManage(role);
    // 调用方法做 复合处理
    addComposites(auth, adminEvent, session.getContext().getUri(), roles, role);
}

RoleResource 处理方法

protected void addComposites(AdminPermissionEvaluator auth, AdminEventBuilder adminEvent, UriInfo uriInfo, List<RoleRepresentation> roles, RoleModel role) {
    // 遍历子角色列表
    for (RoleRepresentation rep : roles) {
        // 检查是否存在id
        if (rep.getId() == null) throw new NotFoundException("Could not find composite role");
        // 根据角色id找出角色对象
        RoleModel composite = realm.getRoleById(rep.getId());

        if (composite == null) {
            throw new NotFoundException("Could not find composite role");
        }
        // 验证用户是否有权限符合该角色
        auth.roles().requireMapComposite(composite);
        // 添加复合角色关系
        role.addCompositeRole(composite);
    }
    // Admin审计事件记录操作
    if (role.isClientRole()) {
        adminEvent.resource(ResourceType.CLIENT_ROLE);
    } else {
        adminEvent.resource(ResourceType.REALM_ROLE);
    }

    adminEvent.operation(OperationType.CREATE).resourcePath(uriInfo).representation(roles).success();
}

RoleAdapter 缓存层处理

实际上做组合处理会先调用 KeyCloak 的缓存装饰器 org.keycloak.models.cache.infinispan.RoleAdapter
在调用getRoleById查询的角色的时候,会构建该装饰器 RealmCacheSession getRoleById 方法

protected RoleModel updated;
private final Supplier<RoleModel> modelSupplier;

public RoleAdapter(CachedRole cached, RealmCacheSession cacheSession, RealmModel realm) {
    this.cached = cached;
    this.cacheSession = cacheSession;
    this.session = cacheSession.session;
    this.realm = realm;
    this.modelSupplier = this::getRoleModel;
}
private RoleModel getRoleModel() {
    // 这里的做了缓存层到JPA层的桥接 - 获取了 JPA的RoleAdapter 实现
    return cacheSession.getRoleDelegate().getRoleById(realm, cached.getId());
}
protected void getDelegateForUpdate() {
    if (updated == null) {
        // 注册缓存失效
        cacheSession.registerRoleInvalidation(cached.getId(), cached.getName(), getContainerId());
        // 获取真正的处理对象
        updated = modelSupplier.get();
        if (updated == null) throw new IllegalStateException("Not found in database");
    }
}

@Override
public void addCompositeRole(RoleModel role) {
    getDelegateForUpdate();
    // 委托处理JPA的 RoleAdapter 做处理
    updated.addCompositeRole(role);
}

RoleAdapter Jpa层处理

protected RoleEntity role;
protected EntityManager em;
protected RealmModel realm;
protected KeycloakSession session;

public RoleAdapter(KeycloakSession session, RealmModel realm, EntityManager em, RoleEntity role) {
    this.em = em;
    this.realm = realm;
    this.role = role;
    this.session = session;
}

@Override
public void addCompositeRole(RoleModel role) {
    // 将RoleModel对象转为 RoleEntity - 这里的 Role 是被组合的对象
    RoleEntity entity = toRoleEntity(role);
    // 检查被组合 role 之中是否已经存在该角色
    for (RoleEntity composite : getEntity().getCompositeRoles()) {
        if (composite.equals(entity)) return;
    }
    // 将被组合的 role 添加到 主体 role 的组合role之中
    getEntity().getCompositeRoles().add(entity);
}

private RoleEntity toRoleEntity(RoleModel model) {
    if (model instanceof RoleAdapter) {
        return ((RoleAdapter) model).getEntity();
    }
    // 如果当前不是 RoleAdapter 那么用JPA 获取代理引用,(在实际使用时再做懒加载)
    return em.getReference(RoleEntity.class, model.getId());
}

显然的,如果我们只观察源码,很明显在这里其实并没有对数据做持久化操作,也看不太出来他是怎么做的持久化,但KeyCloak毕竟用的是JPA,即使没有持久化的代码,我们还是可以先去看看一些间接的证据,我们可以看到 compositeRoles 的这一成员属性,它被标注了与 COMPOSITE_ROLE 有关,并且指明了对应的列。

RoleEntity 实体类定义

@Entity
//@DynamicInsert
//@DynamicUpdate
@Table(name="KEYCLOAK_ROLE", uniqueConstraints = {
        @UniqueConstraint(columnNames = { "NAME", "CLIENT_REALM_CONSTRAINT" })
})
@NamedQueries({
        @NamedQuery(name="getClientRoles", query="select role from RoleEntity role where role.clientId = :client order by role.name"),
        @NamedQuery(name="getClientRoleIds", query="select role.id from RoleEntity role where role.clientId = :client"),
        @NamedQuery(name="getClientRoleByName", query="select role from RoleEntity role where role.name = :name and role.clientId = :client"),
        @NamedQuery(name="getClientRoleIdByName", query="select role.id from RoleEntity role where role.name = :name and role.clientId = :client"),
        @NamedQuery(name="searchForClientRoles", query="select role from RoleEntity role where role.clientId = :client and ( lower(role.name) like :search or lower(role.description) like :search ) order by role.name"),
        @NamedQuery(name="getRealmRoles", query="select role from RoleEntity role where role.clientRole = false and role.realmId = :realm order by role.name"),
        @NamedQuery(name="getRealmRoleIds", query="select role.id from RoleEntity role where role.clientRole = false and role.realmId = :realm"),
        @NamedQuery(name="getRealmRoleByName", query="select role from RoleEntity role where role.clientRole = false and role.name = :name and role.realmId = :realm"),
        @NamedQuery(name="getRealmRoleIdByName", query="select role.id from RoleEntity role where role.clientRole = false and role.name = :name and role.realmId = :realm"),
        @NamedQuery(name="searchForRealmRoles", query="select role from RoleEntity role where role.clientRole = false and role.realmId = :realm and ( lower(role.name) like :search or lower(role.description) like :search ) order by role.name"),
        @NamedQuery(name="getRoleIdsFromIdList", query="select role.id from RoleEntity role where role.realmId = :realm and role.id in :ids order by role.name ASC"),
        @NamedQuery(name="getRoleIdsByNameContainingFromIdList", query="select role.id from RoleEntity role where role.realmId = :realm and lower(role.name) like lower(concat('%',:search,'%')) and role.id in :ids order by role.name ASC"),
})

public class RoleEntity {
    ······
    // JoinTable 以及 joinColumns 决定了持久化的效果
    @ManyToMany(fetch = FetchType.LAZY, cascade = {})
    @JoinTable(name = "COMPOSITE_ROLE", joinColumns = @JoinColumn(name = "COMPOSITE"), inverseJoinColumns = @JoinColumn(name = "CHILD_ROLE"))
    private Set<RoleEntity> compositeRoles;
    ······
}

事务持久化处理

UndertowRequestFilter

显然,只能debug来看具体发生什么了,在debug后发现答案其实藏在 keycloak 对请求的处理逻辑之中,我们的每一个请求其实都会经一个叫做 UndertowRequestFilter 的过滤器,并由它来调用 KeycloakModelUtils 将 HTTP 请求包装在事务之中进行处理。

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
    throws UnsupportedEncodingException {
    servletRequest.setCharacterEncoding("UTF-8");
    final HttpServletRequest request = (HttpServletRequest) servletRequest;

    ClientConnection connection = createClientConnection(request);
    // 请求在分配到对应的接口处理之前,会被 Filter 的该方法拦截,并将后续过滤器链及接口调用过程包装在事务中
    KeycloakModelUtils.runJobInTransaction(factory, session -> {
        try {
            ResteasyContext.pushContext(KeycloakSession.class, session);
            session.getContext().setConnection(connection);
            filterChain.doFilter(servletRequest, servletResponse);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });
}

显然实际调用处理的是 runJobInTransaction 方法,而它是位于 KeycloakModelUtils 中的方法,在其调用的方法中,也是直接对其做事务调用处理。

KeycloakModelUtils

/**
 * Wrap given runnable job into KeycloakTransaction.
 * @param factory The session factory to use
 * @param task The task to execute
 */
public static void runJobInTransaction(KeycloakSessionFactory factory, KeycloakSessionTask task) {
    runJobInTransaction(factory, null, task);
}
/**
 * Wrap given runnable job into KeycloakTransaction.
 * @param factory The session factory to use
 * @param context The context from the previous session
 * @param task The task to execute
 */
public static void runJobInTransaction(KeycloakSessionFactory factory, KeycloakContext context, KeycloakSessionTask task) {
    runJobInTransactionWithResult(factory, context, session -> {
        task.run(session);
        return null;
    }, task.useExistingSession(), task.getTaskName());
}

/**
 * Wrap a given callable job into a KeycloakTransaction.
 * @param <V> The type for the result
 * @param factory The session factory   ----   KeycloakSession的工厂
 * @param context The context from the previous session to use ----  如果复用现有的会话,这里会有值,但从调用链来看,我们的HTTP请求在这里会是 NULL
 * @param callable The callable to execute   ---- 设置上下文,设定调用接口信息后,继续调用过滤器链
 * @param useExistingSession if the existing session should be used ---- 在我的示例调用中,该值为 `false`
 * @param taskName Name of the task. Can be useful for logging purposes ---- 在我的示例中,该值为 `Non-HTTP task`
 * @return The return value from the callable
 */
public static <V> V runJobInTransactionWithResult(KeycloakSessionFactory factory, KeycloakContext context, final KeycloakSessionTaskWithResult<V> callable,boolean useExistingSession, String taskName) {
    V result;
    KeycloakSession existing = KeycloakSessionUtil.getKeycloakSession();
    if (useExistingSession && existing != null && existing.getTransactionManager().isActive()) {
        return callable.run(existing);
    }
    try (KeycloakSession session = factory.create()) {
        RequestContextHelper.getContext(session).setContextMessage(taskName);
        session.getTransactionManager().begin();
        KeycloakSessionUtil.setKeycloakSession(session);
        try {
            cloneContextRealmClientToSession(context, session);
            // 这里调用的 Callable 实际上调用的就是 UndertowRequestFilter 中 dofilter 的实现
            result = callable.run(session);
        } catch (Throwable t) {
            session.getTransactionManager().setRollbackOnly();
            throw t;
        } finally {
            KeycloakSessionUtil.setKeycloakSession(existing);
        }
    }
    return result;
}

到这里原因就明显很多了,其实持久化的实现是依托于 JPA 的脏数据自动跟踪持久化实现的。大致流程如下:

dofilter 后续其实就是将 undertow 的过滤器链走一遍,并实际调用到接口中,结果很显然接口调用的整个处理逻辑其实都是都被 KeycloakModelUtilsrunJobInTransactionWithResult 的事务包裹。在事务提交的时候,em.getReference 获取的 RoleEntity 被修改形成 JPA 的脏数据,生成需要执行的SQL,并完成持久化。

优点

  • 简化开发:直接将HTTP请求作为事务控制的粒度来实现数据库的持久化处理来看,对于业务逻辑的开发人员来说,开发变为尤为简单也不需要考虑内存改动落入磁盘的过程。
  • 统一事务控制:对于开发人员来说,开发接口时甚至都不需要在意事务控制的问题,对查出的数据的成员属性做设置即可实现修改
  • 充分利用JPA脏检查、一级缓存支持

缺点

  • 长事务风险:直接将网络请求本身作为事务的粒度,无可避免的会出现潜在的复杂业务需求导致数据库等资源长期锁定,从而出现潜在的阻塞问题。
  • 连接池耗尽:在并发程度较高的场景下,以请求作为单位开启事务,在配合事务处理时间较长这一可能,连接池中连接数量被耗尽的可能其实相当高
  • 事务粒度过大:对于批量处理的接口(如果有的话),会对内存和事务日志的控制本身带来压力,并且由于事务粒度过大其中业务处理中任一一个持久化逻辑出现错误,都会导致整个操作全部回滚,如果事务处理的逻辑很多,回滚带来的花销也是相当大的。