Files
casdoor/object/webhook_worker.go

248 lines
6.5 KiB
Go

// Copyright 2026 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package object
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/beego/beego/v2/core/logs"
"github.com/casdoor/casdoor/util"
)
var (
webhookWorkerMu sync.Mutex
webhookWorkerRunning = false
webhookWorkerStop chan struct{}
webhookPollingInterval = 30 * time.Second // Configurable polling interval
webhookBatchSize = 100 // Configurable batch size for processing events
)
// StartWebhookDeliveryWorker starts the background worker for webhook delivery
func StartWebhookDeliveryWorker() {
webhookWorkerMu.Lock()
defer webhookWorkerMu.Unlock()
if webhookWorkerRunning {
return
}
stopCh := make(chan struct{})
webhookWorkerStop = stopCh
webhookWorkerRunning = true
util.SafeGoroutine(func() {
ticker := time.NewTicker(webhookPollingInterval)
defer ticker.Stop()
defer func() {
webhookWorkerMu.Lock()
defer webhookWorkerMu.Unlock()
if webhookWorkerStop == stopCh {
webhookWorkerRunning = false
webhookWorkerStop = nil
}
}()
for {
select {
case <-stopCh:
return
case <-ticker.C:
processWebhookEvents()
}
}
})
}
// StopWebhookDeliveryWorker stops the background worker
func StopWebhookDeliveryWorker() {
webhookWorkerMu.Lock()
defer webhookWorkerMu.Unlock()
if !webhookWorkerRunning {
return
}
if webhookWorkerStop == nil {
webhookWorkerRunning = false
return
}
close(webhookWorkerStop)
webhookWorkerStop = nil
webhookWorkerRunning = false
}
// processWebhookEvents processes pending webhook events
func processWebhookEvents() {
events, err := GetPendingWebhookEvents(webhookBatchSize)
if err != nil {
logs.Error(fmt.Sprintf("failed to get pending webhook events: %v", err))
return
}
for _, event := range events {
deliverWebhookEvent(event)
}
}
// deliverWebhookEvent attempts to deliver a single webhook event
func deliverWebhookEvent(event *WebhookEvent) {
// Get the webhook configuration
webhook, err := GetWebhook(event.WebhookName)
if err != nil {
logs.Error(fmt.Sprintf("failed to get webhook %s: %v", event.WebhookName, err))
return
}
if webhook == nil {
// Webhook has been deleted, mark event as failed
event.Status = WebhookEventStatusFailed
event.LastError = "Webhook not found"
UpdateWebhookEventStatus(event, WebhookEventStatusFailed, 0, "", fmt.Errorf("webhook not found"))
return
}
if !webhook.IsEnabled {
// Disabled webhooks should finalize the event to avoid hot-looping forever.
UpdateWebhookEventStatus(event, WebhookEventStatusFailed, 0, "", fmt.Errorf("webhook is disabled"))
return
}
// Parse the record from payload
var record Record
err = json.Unmarshal([]byte(event.Payload), &record)
if err != nil {
event.Status = WebhookEventStatusFailed
event.LastError = fmt.Sprintf("Invalid payload: %v", err)
UpdateWebhookEventStatus(event, WebhookEventStatusFailed, 0, "", err)
return
}
// Parse extended user if present
var extendedUser *User
if event.ExtendedUser != "" {
extendedUser = &User{}
err = json.Unmarshal([]byte(event.ExtendedUser), extendedUser)
if err != nil {
logs.Warning(fmt.Sprintf("failed to parse extended user for webhook event %s: %v", event.GetId(), err))
extendedUser = nil
}
}
// Increment attempt count
event.AttemptCount++
// Attempt to send the webhook
statusCode, respBody, err := sendWebhook(webhook, &record, extendedUser)
// Add webhook record for backward compatibility (only if non-200 status)
if statusCode != 200 {
addWebhookRecord(webhook, &record, statusCode, respBody, err)
}
// Determine the result
if err == nil && statusCode >= 200 && statusCode < 300 {
// Success
UpdateWebhookEventStatus(event, WebhookEventStatusSuccess, statusCode, respBody, nil)
} else {
// Failed - decide whether to retry
maxRetries := event.MaxRetries
if maxRetries <= 0 {
maxRetries = webhook.MaxRetries
}
if maxRetries <= 0 {
maxRetries = 3 // Default
}
event.MaxRetries = maxRetries
if event.AttemptCount >= maxRetries {
// Max retries reached, mark as permanently failed
UpdateWebhookEventStatus(event, WebhookEventStatusFailed, statusCode, respBody, err)
} else {
// Schedule retry
retryInterval := webhook.RetryInterval
if retryInterval <= 0 {
retryInterval = 60 // Default 60 seconds
}
nextRetryTime := calculateNextRetryTime(event.AttemptCount, retryInterval, webhook.UseExponentialBackoff)
event.NextRetryTime = nextRetryTime
event.Status = WebhookEventStatusRetrying
UpdateWebhookEventStatus(event, WebhookEventStatusRetrying, statusCode, respBody, err)
}
}
}
// calculateNextRetryTime calculates the next retry time based on attempt count and backoff strategy
func calculateNextRetryTime(attemptCount int, baseInterval int, useExponentialBackoff bool) string {
var delaySeconds int
if useExponentialBackoff {
// Exponential backoff: baseInterval * 2^(attemptCount-1)
// Cap attemptCount at 10 to prevent overflow
cappedAttemptCount := attemptCount - 1
if cappedAttemptCount > 10 {
cappedAttemptCount = 10
}
// Calculate delay with overflow protection
delaySeconds = baseInterval * (1 << uint(cappedAttemptCount))
// Cap at 1 hour
if delaySeconds > 3600 {
delaySeconds = 3600
}
} else {
// Fixed interval
delaySeconds = baseInterval
}
nextTime := time.Now().Add(time.Duration(delaySeconds) * time.Second)
return nextTime.Format("2006-01-02T15:04:05Z07:00")
}
// ReplayWebhookEvent replays a failed or missed webhook event
func ReplayWebhookEvent(eventId string) error {
event, err := GetWebhookEvent(eventId)
if err != nil {
return err
}
if event == nil {
return fmt.Errorf("webhook event not found: %s", eventId)
}
// Reset the event for replay
event.Status = WebhookEventStatusPending
event.AttemptCount = 0
event.NextRetryTime = ""
event.LastError = ""
_, err = UpdateWebhookEvent(event.GetId(), event)
if err != nil {
return err
}
// Immediately try to deliver
deliverWebhookEvent(event)
return nil
}