feat: enhance webhook worker reliability and event pagination (#5344)

This commit is contained in:
Paperlz
2026-03-31 09:51:24 +08:00
committed by GitHub
parent bb11511029
commit 0ff862dbc5
7 changed files with 137 additions and 71 deletions

View File

@@ -24,6 +24,38 @@ import (
const defaultWebhookEventListLimit = 100
func (c *ApiController) getScopedWebhookEventQuery() (string, string, bool) {
organization, ok := c.RequireAdmin()
if !ok {
return "", "", false
}
owner := ""
if c.IsGlobalAdmin() {
owner = c.Ctx.Input.Query("owner")
requestedOrganization := c.Ctx.Input.Query("organization")
if requestedOrganization != "" {
organization = requestedOrganization
}
}
return owner, organization, true
}
func (c *ApiController) checkWebhookEventAccess(event *object.WebhookEvent, organization string) bool {
if event == nil || c.IsGlobalAdmin() {
return true
}
if event.Organization != organization {
c.ResponseError(c.T("auth:Unauthorized operation"))
return false
}
return true
}
// GetWebhookEvents
// @Title GetWebhookEvents
// @Tag Webhook Event API
@@ -35,12 +67,16 @@ const defaultWebhookEventListLimit = 100
// @Success 200 {array} object.WebhookEvent The Response object
// @router /get-webhook-events [get]
func (c *ApiController) GetWebhookEvents() {
owner := c.Ctx.Input.Query("owner")
organization := c.Ctx.Input.Query("organization")
owner, organization, ok := c.getScopedWebhookEventQuery()
if !ok {
return
}
webhookName := c.Ctx.Input.Query("webhookName")
status := c.Ctx.Input.Query("status")
limit := c.Ctx.Input.Query("pageSize")
page := c.Ctx.Input.Query("p")
sortField := c.Ctx.Input.Query("sortField")
sortOrder := c.Ctx.Input.Query("sortOrder")
if limit != "" && page != "" {
limit := util.ParseInt(limit)
@@ -51,7 +87,7 @@ func (c *ApiController) GetWebhookEvents() {
}
paginator := pagination.NewPaginator(c.Ctx.Request, limit, count)
events, err := object.GetWebhookEvents(owner, organization, webhookName, object.WebhookEventStatus(status), paginator.Offset(), limit)
events, err := object.GetWebhookEvents(owner, organization, webhookName, object.WebhookEventStatus(status), paginator.Offset(), limit, sortField, sortOrder)
if err != nil {
c.ResponseError(err.Error())
return
@@ -59,7 +95,7 @@ func (c *ApiController) GetWebhookEvents() {
c.ResponseOk(events, paginator.Nums())
} else {
events, err := object.GetWebhookEvents(owner, organization, webhookName, object.WebhookEventStatus(status), 0, defaultWebhookEventListLimit)
events, err := object.GetWebhookEvents(owner, organization, webhookName, object.WebhookEventStatus(status), 0, defaultWebhookEventListLimit, sortField, sortOrder)
if err != nil {
c.ResponseError(err.Error())
return
@@ -77,6 +113,11 @@ func (c *ApiController) GetWebhookEvents() {
// @Success 200 {object} object.WebhookEvent The Response object
// @router /get-webhook-event-detail [get]
func (c *ApiController) GetWebhookEvent() {
organization, ok := c.RequireAdmin()
if !ok {
return
}
id := c.Ctx.Input.Query("id")
event, err := object.GetWebhookEvent(id)
@@ -85,6 +126,10 @@ func (c *ApiController) GetWebhookEvent() {
return
}
if !c.checkWebhookEventAccess(event, organization) {
return
}
c.ResponseOk(event)
}
@@ -96,43 +141,30 @@ func (c *ApiController) GetWebhookEvent() {
// @Success 200 {object} controllers.Response The Response object
// @router /replay-webhook-event [post]
func (c *ApiController) ReplayWebhookEvent() {
organization, ok := c.RequireAdmin()
if !ok {
return
}
id := c.Ctx.Input.Query("id")
err := object.ReplayWebhookEvent(id)
event, err := object.GetWebhookEvent(id)
if err != nil {
c.ResponseError(err.Error())
return
}
c.ResponseOk("Webhook event replayed successfully")
}
if !c.checkWebhookEventAccess(event, organization) {
return
}
// ReplayWebhookEvents
// @Title ReplayWebhookEvents
// @Tag Webhook Event API
// @Description replay multiple webhook events
// @Param owner query string false "The owner of webhook events"
// @Param organization query string false "The organization"
// @Param webhookName query string false "The webhook name"
// @Param status query string false "Event status to replay (e.g., failed)"
// @Success 200 {object} controllers.Response The Response object
// @router /replay-webhook-events [post]
func (c *ApiController) ReplayWebhookEvents() {
owner := c.Ctx.Input.Query("owner")
organization := c.Ctx.Input.Query("organization")
webhookName := c.Ctx.Input.Query("webhookName")
status := c.Ctx.Input.Query("status")
count, err := object.ReplayWebhookEvents(owner, organization, webhookName, object.WebhookEventStatus(status))
err = object.ReplayWebhookEvent(id)
if err != nil {
c.ResponseError(err.Error())
return
}
c.ResponseOk(map[string]interface{}{
"count": count,
"message": "webhook events replayed successfully",
})
c.ResponseOk("Webhook event replay triggered")
}
// DeleteWebhookEvent
@@ -143,6 +175,11 @@ func (c *ApiController) ReplayWebhookEvents() {
// @Success 200 {object} controllers.Response The Response object
// @router /delete-webhook-event [post]
func (c *ApiController) DeleteWebhookEvent() {
organization, ok := c.RequireAdmin()
if !ok {
return
}
var event object.WebhookEvent
err := json.Unmarshal(c.Ctx.Input.RequestBody, &event)
if err != nil {
@@ -150,6 +187,16 @@ func (c *ApiController) DeleteWebhookEvent() {
return
}
storedEvent, err := object.GetWebhookEvent(event.GetId())
if err != nil {
c.ResponseError(err.Error())
return
}
if !c.checkWebhookEventAccess(storedEvent, organization) {
return
}
c.Data["json"] = wrapActionResponse(object.DeleteWebhookEvent(&event))
c.ServeJSON()
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/casdoor/casdoor/util"
"github.com/xorm-io/core"
"github.com/xorm-io/xorm"
)
// WebhookEventStatus represents the delivery status of a webhook event
@@ -85,9 +86,8 @@ func getWebhookEvent(owner string, name string) (*WebhookEvent, error) {
return nil, nil
}
func GetWebhookEvents(owner, organization, webhookName string, status WebhookEventStatus, offset, limit int) ([]*WebhookEvent, error) {
events := []*WebhookEvent{}
session := ormer.Engine.Desc("created_time")
func getWebhookEventSession(owner, organization, webhookName string, status WebhookEventStatus, offset, limit int, sortField, sortOrder string) *xorm.Session {
session := ormer.Engine.Prepare()
if owner != "" {
session = session.Where("owner = ?", owner)
@@ -108,6 +108,22 @@ func GetWebhookEvents(owner, organization, webhookName string, status WebhookEve
session = session.Limit(limit)
}
if sortField == "" || sortOrder == "" {
sortField = "created_time"
}
if sortOrder == "ascend" {
session = session.Asc(util.SnakeString(sortField))
} else {
session = session.Desc(util.SnakeString(sortField))
}
return session
}
func GetWebhookEvents(owner, organization, webhookName string, status WebhookEventStatus, offset, limit int, sortField, sortOrder string) ([]*WebhookEvent, error) {
events := []*WebhookEvent{}
session := getWebhookEventSession(owner, organization, webhookName, status, offset, limit, sortField, sortOrder)
err := session.Find(&events)
if err != nil {
return nil, err
@@ -196,6 +212,10 @@ func UpdateWebhookEventStatus(event *WebhookEvent, status WebhookEventStatus, st
event.LastResponse = response
event.UpdatedTime = util.GetCurrentTime()
if status != WebhookEventStatusRetrying {
event.NextRetryTime = ""
}
if err != nil {
event.LastError = err.Error()
} else {

View File

@@ -19,12 +19,13 @@ import (
"net/http"
"reflect"
"strings"
"time"
"github.com/casdoor/casdoor/util"
)
func sendWebhook(webhook *Webhook, record *Record, extendedUser *User) (int, string, error) {
client := &http.Client{}
client := &http.Client{Timeout: 30 * time.Second}
userMap := make(map[string]interface{})
var body io.Reader

View File

@@ -20,6 +20,7 @@ import (
"sync"
"time"
"github.com/beego/beego/v2/core/logs"
"github.com/casdoor/casdoor/util"
)
@@ -91,7 +92,7 @@ func StopWebhookDeliveryWorker() {
func processWebhookEvents() {
events, err := GetPendingWebhookEvents(webhookBatchSize)
if err != nil {
fmt.Printf("Error getting pending webhook events: %v\n", err)
logs.Error(fmt.Sprintf("failed to get pending webhook events: %v", err))
return
}
@@ -105,7 +106,7 @@ func deliverWebhookEvent(event *WebhookEvent) {
// Get the webhook configuration
webhook, err := GetWebhook(event.WebhookName)
if err != nil {
fmt.Printf("Error getting webhook %s: %v\n", event.WebhookName, err)
logs.Error(fmt.Sprintf("failed to get webhook %s: %v", event.WebhookName, err))
return
}
@@ -118,7 +119,8 @@ func deliverWebhookEvent(event *WebhookEvent) {
}
if !webhook.IsEnabled {
// Webhook is disabled, skip for now
// Disabled webhooks should finalize the event to avoid hot-looping forever.
UpdateWebhookEventStatus(event, WebhookEventStatusFailed, 0, "", fmt.Errorf("webhook is disabled"))
return
}
@@ -138,7 +140,7 @@ func deliverWebhookEvent(event *WebhookEvent) {
extendedUser = &User{}
err = json.Unmarshal([]byte(event.ExtendedUser), extendedUser)
if err != nil {
fmt.Printf("Error parsing extended user: %v\n", err)
logs.Warning(fmt.Sprintf("failed to parse extended user for webhook event %s: %v", event.GetId(), err))
extendedUser = nil
}
}
@@ -243,21 +245,3 @@ func ReplayWebhookEvent(eventId string) error {
return nil
}
// ReplayWebhookEvents replays multiple webhook events matching the criteria
func ReplayWebhookEvents(owner, organization, webhookName string, status WebhookEventStatus) (int, error) {
events, err := GetWebhookEvents(owner, organization, webhookName, status, 0, 0)
if err != nil {
return 0, err
}
count := 0
for _, event := range events {
err = ReplayWebhookEvent(event.GetId())
if err == nil {
count++
}
}
return count, nil
}

View File

@@ -315,7 +315,6 @@ func InitAPI() {
web.Router("/api/get-webhook-events", &controllers.ApiController{}, "GET:GetWebhookEvents")
web.Router("/api/get-webhook-event-detail", &controllers.ApiController{}, "GET:GetWebhookEvent")
web.Router("/api/replay-webhook-event", &controllers.ApiController{}, "POST:ReplayWebhookEvent")
web.Router("/api/replay-webhook-events", &controllers.ApiController{}, "POST:ReplayWebhookEvents")
web.Router("/api/delete-webhook-event", &controllers.ApiController{}, "POST:DeleteWebhookEvent")
web.Router("/api/get-tickets", &controllers.ApiController{}, "GET:GetTickets")

View File

@@ -28,6 +28,9 @@ class WebhookEventListPage extends React.Component {
loading: false,
replayingId: "",
isAuthorized: true,
statusFilter: "",
sortField: "",
sortOrder: "",
detailShow: false,
detailRecord: null,
pagination: {
@@ -54,7 +57,7 @@ class WebhookEventListPage extends React.Component {
...this.state.pagination,
current: 1,
};
this.fetchWebhookEvents(pagination);
this.fetchWebhookEvents(pagination, this.state.statusFilter, this.state.sortField, this.state.sortOrder);
};
getStatusTag = (status) => {
@@ -94,16 +97,19 @@ class WebhookEventListPage extends React.Component {
return Setting.isDefaultOrganizationSelected(this.props.account) ? "" : Setting.getRequestOrganization(this.props.account);
};
fetchWebhookEvents = (pagination = this.state.pagination) => {
fetchWebhookEvents = (pagination = this.state.pagination, statusFilter = this.state.statusFilter, sortField = this.state.sortField, sortOrder = this.state.sortOrder) => {
this.setState({loading: true});
WebhookEventBackend.getWebhookEvents("", this.getOrganizationFilter(), pagination.current, pagination.pageSize)
WebhookEventBackend.getWebhookEvents("", this.getOrganizationFilter(), pagination.current, pagination.pageSize, "", statusFilter, sortField, sortOrder)
.then((res) => {
this.setState({loading: false});
if (res.status === "ok") {
this.setState({
data: res.data || [],
statusFilter,
sortField,
sortOrder,
pagination: {
...pagination,
total: res.data2 ?? 0,
@@ -130,10 +136,10 @@ class WebhookEventListPage extends React.Component {
this.setState({replayingId: ""});
if (res.status === "ok") {
Setting.showMessage("success", typeof res.data === "string" ? res.data : i18next.t("webhook:Webhook event replayed successfully"));
this.fetchWebhookEvents(this.state.pagination);
Setting.showMessage("success", typeof res.data === "string" ? res.data : i18next.t("webhook:Webhook event replay triggered"));
this.fetchWebhookEvents(this.state.pagination, this.state.statusFilter, this.state.sortField, this.state.sortOrder);
} else {
Setting.showMessage("error", `${i18next.t("general:Failed to save")}: ${res.msg}`);
Setting.showMessage("error", `${i18next.t("webhook:Failed to replay webhook event")}: ${res.msg}`);
}
})
.catch((error) => {
@@ -142,8 +148,16 @@ class WebhookEventListPage extends React.Component {
});
};
handleTableChange = (pagination) => {
this.fetchWebhookEvents(pagination);
handleTableChange = (pagination, filters, sorter) => {
const statusFilter = Array.isArray(filters?.status) ? (filters.status[0] ?? "") : (filters?.status ?? "");
const sortField = Array.isArray(sorter) ? "" : sorter?.field ?? "";
const sortOrder = Array.isArray(sorter) ? "" : sorter?.order ?? "";
const nextPagination = statusFilter !== this.state.statusFilter ? {
...pagination,
current: 1,
} : pagination;
this.fetchWebhookEvents(nextPagination, statusFilter, sortField, sortOrder);
};
openDetailDrawer = (record) => {
@@ -207,7 +221,8 @@ class WebhookEventListPage extends React.Component {
{text: i18next.t("webhook:Failed"), value: "failed"},
{text: i18next.t("webhook:Retrying"), value: "retrying"},
],
onFilter: (value, record) => record.status === value,
filterMultiple: false,
filteredValue: this.state.statusFilter ? [this.state.statusFilter] : null,
render: (text) => this.getStatusTag(text),
},
{
@@ -215,18 +230,16 @@ class WebhookEventListPage extends React.Component {
dataIndex: "attemptCount",
key: "attemptCount",
width: 140,
sorter: (a, b) => (a.attemptCount || 0) - (b.attemptCount || 0),
sorter: true,
sortOrder: this.state.sortField === "attemptCount" ? this.state.sortOrder : null,
},
{
title: i18next.t("webhook:Next Retry Time"),
dataIndex: "nextRetryTime",
key: "nextRetryTime",
width: 180,
sorter: (a, b) => {
const timeA = a.nextRetryTime ? new Date(a.nextRetryTime).getTime() : 0;
const timeB = b.nextRetryTime ? new Date(b.nextRetryTime).getTime() : 0;
return timeA - timeB;
},
sorter: true,
sortOrder: this.state.sortField === "nextRetryTime" ? this.state.sortOrder : null,
render: (text) => text ? Setting.getFormattedDate(text) : "-",
},
{

View File

@@ -14,7 +14,7 @@
import * as Setting from "../Setting";
export function getWebhookEvents(owner = "", organization = "", page = "", pageSize = "", webhookName = "", status = "") {
export function getWebhookEvents(owner = "", organization = "", page = "", pageSize = "", webhookName = "", status = "", sortField = "", sortOrder = "") {
const params = new URLSearchParams({
owner,
organization,
@@ -22,6 +22,8 @@ export function getWebhookEvents(owner = "", organization = "", page = "", pageS
p: page,
webhookName,
status,
sortField,
sortOrder,
});
return fetch(`${Setting.ServerUrl}/api/get-webhook-events?${params.toString()}`, {