fix: refactor out log/agent_openclaw.go

This commit is contained in:
Yang Luo
2026-04-05 01:52:25 +08:00
parent e73cfe8b40
commit fab57364db
11 changed files with 168 additions and 65 deletions

View File

@@ -15,7 +15,6 @@
package controllers
import (
"github.com/casdoor/casdoor/object"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
@@ -33,8 +32,8 @@ func (c *ApiController) AddOtlpTrace() {
if body == nil {
return
}
providerName := resolveOpenClawProvider(c.Ctx)
if providerName == "" {
provider := resolveOpenClawProvider(c.Ctx)
if provider == nil {
return
}
@@ -50,7 +49,7 @@ func (c *ApiController) AddOtlpTrace() {
return
}
if _, err := object.AddEntry(object.NewTraceEntry(providerName, message)); err != nil {
if err := provider.AddTrace(message); err != nil {
responseOtlpError(c.Ctx, 500, "save trace failed: %v", err)
return
}
@@ -71,8 +70,8 @@ func (c *ApiController) AddOtlpMetrics() {
if body == nil {
return
}
providerName := resolveOpenClawProvider(c.Ctx)
if providerName == "" {
provider := resolveOpenClawProvider(c.Ctx)
if provider == nil {
return
}
@@ -88,7 +87,7 @@ func (c *ApiController) AddOtlpMetrics() {
return
}
if _, err := object.AddEntry(object.NewOtlpEntry(providerName, "metrics", message)); err != nil {
if err := provider.AddMetrics(message); err != nil {
responseOtlpError(c.Ctx, 500, "save metrics failed: %v", err)
return
}
@@ -109,8 +108,8 @@ func (c *ApiController) AddOtlpLogs() {
if body == nil {
return
}
providerName := resolveOpenClawProvider(c.Ctx)
if providerName == "" {
provider := resolveOpenClawProvider(c.Ctx)
if provider == nil {
return
}
@@ -126,7 +125,7 @@ func (c *ApiController) AddOtlpLogs() {
return
}
if _, err := object.AddEntry(object.NewOtlpEntry(providerName, "log", message)); err != nil {
if err := provider.AddLogs(message); err != nil {
responseOtlpError(c.Ctx, 500, "save logs failed: %v", err)
return
}

View File

@@ -20,6 +20,7 @@ import (
"strings"
"github.com/beego/beego/v2/server/web/context"
"github.com/casdoor/casdoor/log"
"github.com/casdoor/casdoor/object"
"github.com/casdoor/casdoor/util"
)
@@ -29,18 +30,18 @@ func responseOtlpError(ctx *context.Context, status int, format string, args ...
ctx.Output.Body([]byte(fmt.Sprintf(format, args...)))
}
func resolveOpenClawProvider(ctx *context.Context) string {
func resolveOpenClawProvider(ctx *context.Context) *log.OpenClawProvider {
clientIP := util.GetClientIpFromRequest(ctx.Request)
provider, err := object.GetOpenClawProviderByIP(clientIP)
if err != nil {
responseOtlpError(ctx, 500, "provider lookup failed: %v", err)
return ""
return nil
}
if provider == nil {
responseOtlpError(ctx, 403, "forbidden: no OpenClaw provider configured for IP %s", clientIP)
return ""
return nil
}
return provider.Name
return provider
}
func readProtobufBody(ctx *context.Context) []byte {

62
log/agent_openclaw.go Normal file
View File

@@ -0,0 +1,62 @@
// Copyright 2026 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package log
import "fmt"
// OtlpAdder persists a single OTLP record into the backing store.
// Parameters: entryType ("trace"/"metrics"/"log") and message (JSON payload).
// The unique entry name is generated by the implementation.
type OtlpAdder func(entryType, message string) error
// OpenClawProvider receives OpenTelemetry data pushed by an OpenClaw agent over
// HTTP and persists each record as an Entry row via the OtlpAdder supplied at
// construction time. It is passive (push-based via HTTP): Start/Stop are no-ops
// and Write is not applicable.
type OpenClawProvider struct {
providerName string
addOtlpEntry OtlpAdder
}
// NewOpenClawProvider creates an OpenClawProvider backed by addOtlpEntry.
func NewOpenClawProvider(providerName string, addOtlpEntry OtlpAdder) *OpenClawProvider {
return &OpenClawProvider{providerName: providerName, addOtlpEntry: addOtlpEntry}
}
// Write is not applicable for an HTTP-push provider and always returns an error.
func (p *OpenClawProvider) Write(_, _ string) error {
return fmt.Errorf("OpenClawProvider receives data over HTTP and does not accept Write calls")
}
// Start is a no-op; OpenClawProvider is passive and has no background goroutine.
func (p *OpenClawProvider) Start(_ EntryAdder, _ func(error)) error { return nil }
// Stop is a no-op.
func (p *OpenClawProvider) Stop() error { return nil }
// AddTrace persists an OTLP trace payload (already serialised to JSON).
func (p *OpenClawProvider) AddTrace(message []byte) error {
return p.addOtlpEntry("trace", string(message))
}
// AddMetrics persists an OTLP metrics payload (already serialised to JSON).
func (p *OpenClawProvider) AddMetrics(message []byte) error {
return p.addOtlpEntry("metrics", string(message))
}
// AddLogs persists an OTLP logs payload (already serialised to JSON).
func (p *OpenClawProvider) AddLogs(message []byte) error {
return p.addOtlpEntry("log", string(message))
}

View File

@@ -35,9 +35,8 @@ func NewPermissionLogProvider(providerName string, addEntry EntryAdder) *Permiss
// Write stores one permission-log entry.
// severity follows syslog conventions (e.g. info, warning, err).
func (p *PermissionLogProvider) Write(severity string, message string) error {
name := fmt.Sprintf("%x", time.Now().UnixNano())
createdTime := time.Now().UTC().Format(time.RFC3339)
return p.addEntry("built-in", name, createdTime, p.providerName, fmt.Sprintf("[%s] %s", severity, message))
return p.addEntry("built-in", createdTime, p.providerName, fmt.Sprintf("[%s] %s", severity, message))
}
// Start is a no-op for PermissionLogProvider; it received its EntryAdder at

View File

@@ -14,14 +14,25 @@
package log
import "fmt"
import (
"fmt"
"github.com/thanhpk/randstr"
)
// GenerateEntryName returns a cryptographically random 32-character hex string
// suitable for use as an Entry.Name primary key.
func GenerateEntryName() string {
return randstr.Hex(16)
}
// EntryAdder persists a collected log entry into the backing store.
// Parameters map to the Entry table columns: owner, name (unique ID),
// createdTime (RFC3339), provider (the log provider name), and message.
// Parameters map to the Entry table columns: owner, createdTime (RFC3339),
// provider (the log provider name), and message. The unique entry name is
// generated by the implementation, so callers do not need to supply one.
// Defined here so it is shared by all LogProvider implementations without
// creating import cycles with the object package.
type EntryAdder func(owner, name, createdTime, provider, message string) error
type EntryAdder func(owner, createdTime, provider, message string) error
// LogProvider is the common interface for all log providers.
//

View File

@@ -85,9 +85,7 @@ func collectSELinuxLogs(ctx context.Context, addEntry EntryAdder) error {
severity := selinuxSeverity(line)
createdTime := parseAuditTimestamp(line)
name := fmt.Sprintf("%x", time.Now().UnixNano())
if err := addEntry("built-in", name, createdTime, "",
if err := addEntry("built-in", createdTime, "",
fmt.Sprintf("[%s] %s", severity, line)); err != nil {
return fmt.Errorf("SELinuxLogProvider: failed to persist audit entry: %w", err)
}

View File

@@ -68,9 +68,7 @@ func (u *unixCollector) collect(ctx context.Context, addEntry EntryAdder) error
severity := journalSeverity(fields)
message := journalMessage(fields)
createdTime := journalTimestamp(fields)
name := fmt.Sprintf("%x", time.Now().UnixNano())
if err := addEntry("built-in", name, createdTime, u.tag,
if err := addEntry("built-in", createdTime, u.tag,
fmt.Sprintf("[%s] %s", severity, message)); err != nil {
return fmt.Errorf("SystemLogProvider: failed to persist journal entry: %w", err)
}

View File

@@ -121,9 +121,7 @@ func (w *windowsCollector) parseAndPersistEvents(out []byte, channel string, add
message = fmt.Sprintf("EventID=%d Source=%s", event.System.EventID, event.System.Provider.Name)
}
createdTime := winEventTimestamp(event.System.TimeCreated.SystemTime)
name := fmt.Sprintf("%x", time.Now().UnixNano())
if err := addEntry("built-in", name, createdTime, w.tag,
if err := addEntry("built-in", createdTime, w.tag,
fmt.Sprintf("[%s] [%s] %s", severity, channel, message)); err != nil {
return fmt.Errorf("SystemLogProvider: failed to persist event (channel=%s EventID=%d): %w",
channel, event.System.EventID, err)

View File

@@ -37,25 +37,6 @@ type Entry struct {
Message string `xorm:"mediumtext" json:"message"`
}
func NewTraceEntry(providerName string, message []byte) *Entry {
return NewOtlpEntry(providerName, "trace", message)
}
func NewOtlpEntry(providerName string, entryType string, message []byte) *Entry {
currentTime := util.GetCurrentTime()
id := fmt.Sprintf("%s_%s_%s", entryType, util.GenerateSimpleTimeId(), util.GetRandomName())
return &Entry{
Owner: CasdoorOrganization,
Name: id,
CreatedTime: currentTime,
UpdatedTime: currentTime,
DisplayName: id,
Provider: providerName,
Type: entryType,
Message: string(message),
}
}
func GetEntries(owner string) ([]*Entry, error) {
entries := []*Entry{}

View File

@@ -17,7 +17,6 @@ package object
import (
"fmt"
"sync"
"time"
"github.com/casdoor/casdoor/log"
)
@@ -28,7 +27,8 @@ var (
)
// InitLogProviders scans all globally-configured Log providers and starts
// background collection for pull-based providers (e.g. System Log).
// background collection for pull-based providers (e.g. System Log, SELinux Log)
// and registers passive providers (e.g. OpenClaw).
// It is called once from main() after the database is ready.
func InitLogProviders() {
providers, err := GetGlobalProviders()
@@ -36,21 +36,29 @@ func InitLogProviders() {
return
}
for _, p := range providers {
if p.Category == "Log" && (p.Type == "System Log" || p.Type == "SELinux Log") {
if p.Category != "Log" {
continue
}
switch p.Type {
case "System Log", "SELinux Log":
startLogCollector(p)
case "Agent":
if p.SubType == "OpenClaw" {
startOpenClawProvider(p)
}
}
}
}
// startLogCollector starts a System Log collector for the given provider.
// If a collector for the same provider is already running it is stopped first.
// startLogCollector starts a pull-based log collector (System Log / SELinux Log)
// for the given provider. If a collector for the same provider is already
// running it is stopped first.
func startLogCollector(provider *Provider) {
runningCollectorsMu.Lock()
defer runningCollectorsMu.Unlock()
id := provider.GetId()
// Stop the existing collector for this provider if any.
if existing, ok := runningCollectors[id]; ok {
_ = existing.Stop()
delete(runningCollectors, id)
@@ -67,7 +75,8 @@ func startLogCollector(provider *Provider) {
}
providerName := provider.Name
addEntry := func(owner, name, createdTime, _ string, message string) error {
addEntry := func(owner, createdTime, _ string, message string) error {
name := log.GenerateEntryName()
entry := &Entry{
Owner: owner,
Name: name,
@@ -92,23 +101,49 @@ func startLogCollector(provider *Provider) {
runningCollectors[id] = lp
}
// GetOpenClawProviderByIP returns the first Log/Agent/OpenClaw provider that
// allows the given client IP. A provider with an empty Host field allows any IP.
func GetOpenClawProviderByIP(clientIP string) (*Provider, error) {
// startOpenClawProvider registers an OpenClaw provider in runningCollectors so
// that incoming OTLP requests can be routed to it by IP.
func startOpenClawProvider(provider *Provider) {
runningCollectorsMu.Lock()
defer runningCollectorsMu.Unlock()
id := provider.GetId()
if existing, ok := runningCollectors[id]; ok {
_ = existing.Stop()
delete(runningCollectors, id)
}
lp, err := GetLogProviderFromProvider(provider)
if err != nil {
fmt.Printf("InitLogProviders: failed to create OpenClaw provider %s: %v\n", provider.Name, err)
return
}
runningCollectors[id] = lp
}
// GetOpenClawProviderByIP returns the running OpenClawProvider whose Host field
// matches clientIP, or whose Host is empty (meaning any IP is allowed).
// Returns nil if no matching provider is registered.
func GetOpenClawProviderByIP(clientIP string) (*log.OpenClawProvider, error) {
providers := []*Provider{}
err := ormer.Engine.Where("category = ? AND type = ? AND sub_type = ?", "Log", "Agent", "OpenClaw").Find(&providers)
if err != nil {
return nil, err
}
runningCollectorsMu.Lock()
defer runningCollectorsMu.Unlock()
for _, p := range providers {
if p.Host == "" || p.Host == clientIP {
return p, nil
if lp, ok := runningCollectors[p.GetId()]; ok {
if ocp, ok := lp.(*log.OpenClawProvider); ok {
return ocp, nil
}
}
}
}
return nil, nil
}
// makeEntryName returns a hex-encoded unique name for an Entry row.
func makeEntryName() string {
return fmt.Sprintf("%x", time.Now().UnixNano())
}

View File

@@ -618,7 +618,8 @@ func GetLogProviderFromProvider(provider *Provider) (log.LogProvider, error) {
}
if provider.Type == "Casdoor Permission Log" {
return log.NewPermissionLogProvider(provider.Name, func(owner, name, createdTime, providerName, message string) error {
return log.NewPermissionLogProvider(provider.Name, func(owner, createdTime, providerName, message string) error {
name := log.GenerateEntryName()
entry := &Entry{
Owner: owner,
Name: name,
@@ -634,5 +635,25 @@ func GetLogProviderFromProvider(provider *Provider) (log.LogProvider, error) {
}), nil
}
if provider.Type == "Agent" && provider.SubType == "OpenClaw" {
providerName := provider.Name
return log.NewOpenClawProvider(providerName, func(entryType, message string) error {
name := log.GenerateEntryName()
currentTime := util.GetCurrentTime()
entry := &Entry{
Owner: CasdoorOrganization,
Name: name,
CreatedTime: currentTime,
UpdatedTime: currentTime,
DisplayName: name,
Provider: providerName,
Type: entryType,
Message: message,
}
_, err := AddEntry(entry)
return err
}), nil
}
return log.GetLogProvider(provider.Type, provider.Host, provider.Port, provider.Title)
}