server: add client registry with dashboard support (#5115)

This commit is contained in:
fatedier
2026-01-08 20:07:14 +08:00
committed by GitHub
parent bc378bcbec
commit 36718d88e4
59 changed files with 4150 additions and 1837 deletions

146
server/client_registry.go Normal file
View File

@@ -0,0 +1,146 @@
package server
import (
"fmt"
"maps"
"sync"
"time"
)
// ClientInfo captures metadata about a connected frpc instance.
type ClientInfo struct {
Key string
User string
ClientID string
RunID string
Hostname string
Metas map[string]string
FirstConnectedAt time.Time
LastConnectedAt time.Time
DisconnectedAt time.Time
Online bool
}
// ClientRegistry keeps track of active clients keyed by "{user}.{clientID}" (or runID if clientID is empty).
// Entries without an explicit clientID are removed on disconnect to avoid stale offline records.
type ClientRegistry struct {
mu sync.RWMutex
clients map[string]*ClientInfo
runIndex map[string]string
}
func NewClientRegistry() *ClientRegistry {
return &ClientRegistry{
clients: make(map[string]*ClientInfo),
runIndex: make(map[string]string),
}
}
// Register stores/updates metadata for a client and returns the registry key plus whether it conflicts with an online client.
func (cr *ClientRegistry) Register(user, clientID, runID, hostname string, metas map[string]string) (key string, conflict bool) {
if runID == "" {
return "", false
}
effectiveID := clientID
if effectiveID == "" {
effectiveID = runID
}
key = cr.composeClientKey(user, effectiveID)
enforceUnique := clientID != ""
now := time.Now()
cr.mu.Lock()
defer cr.mu.Unlock()
info, exists := cr.clients[key]
if enforceUnique && exists && info.Online && info.RunID != "" && info.RunID != runID {
return key, true
}
if !exists {
info = &ClientInfo{
Key: key,
User: user,
ClientID: clientID,
FirstConnectedAt: now,
}
cr.clients[key] = info
} else if info.RunID != "" {
delete(cr.runIndex, info.RunID)
}
info.RunID = runID
info.Hostname = hostname
info.Metas = metas
if info.FirstConnectedAt.IsZero() {
info.FirstConnectedAt = now
}
info.LastConnectedAt = now
info.DisconnectedAt = time.Time{}
info.Online = true
cr.runIndex[runID] = key
return key, false
}
// MarkOfflineByRunID marks the client as offline when the corresponding control disconnects.
func (cr *ClientRegistry) MarkOfflineByRunID(runID string) {
cr.mu.Lock()
defer cr.mu.Unlock()
key, ok := cr.runIndex[runID]
if !ok {
return
}
if info, ok := cr.clients[key]; ok && info.RunID == runID {
if info.ClientID == "" {
delete(cr.clients, key)
} else {
info.RunID = ""
info.Online = false
now := time.Now()
info.DisconnectedAt = now
}
}
delete(cr.runIndex, runID)
}
// List returns a snapshot of all known clients.
func (cr *ClientRegistry) List() []ClientInfo {
cr.mu.RLock()
defer cr.mu.RUnlock()
result := make([]ClientInfo, 0, len(cr.clients))
for _, info := range cr.clients {
cp := *info
cp.Metas = maps.Clone(info.Metas)
result = append(result, cp)
}
return result
}
// GetByKey retrieves a client by its composite key ({user}.{clientID} or runID fallback).
func (cr *ClientRegistry) GetByKey(key string) (ClientInfo, bool) {
cr.mu.RLock()
defer cr.mu.RUnlock()
info, ok := cr.clients[key]
if !ok {
return ClientInfo{}, false
}
cp := *info
cp.Metas = maps.Clone(info.Metas)
return cp, true
}
func (cr *ClientRegistry) composeClientKey(user, id string) string {
switch {
case user == "":
return id
case id == "":
return user
default:
return fmt.Sprintf("%s.%s", user, id)
}
}

View File

@@ -147,6 +147,8 @@ type Control struct {
// Server configuration information
serverCfg *v1.ServerConfig
clientRegistry *ClientRegistry
xl *xlog.Logger
ctx context.Context
doneCh chan struct{}
@@ -358,6 +360,7 @@ func (ctl *Control) worker() {
}
metrics.Server.CloseClient()
ctl.clientRegistry.MarkOfflineByRunID(ctl.runID)
xl.Infof("client exit success")
close(ctl.doneCh)
}

View File

@@ -17,8 +17,11 @@ package server
import (
"cmp"
"encoding/json"
"fmt"
"net/http"
"slices"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -53,6 +56,8 @@ func (svr *Service) registerRouteHandlers(helper *httppkg.RouterRegisterHelper)
subRouter.HandleFunc("/api/proxy/{type}", svr.apiProxyByType).Methods("GET")
subRouter.HandleFunc("/api/proxy/{type}/{name}", svr.apiProxyByTypeAndName).Methods("GET")
subRouter.HandleFunc("/api/traffic/{name}", svr.apiProxyTraffic).Methods("GET")
subRouter.HandleFunc("/api/clients", svr.apiClientList).Methods("GET")
subRouter.HandleFunc("/api/clients/{key}", svr.apiClientDetail).Methods("GET")
subRouter.HandleFunc("/api/proxies", svr.deleteProxies).Methods("DELETE")
// view
@@ -88,6 +93,19 @@ type serverInfoResp struct {
ProxyTypeCounts map[string]int64 `json:"proxyTypeCount"`
}
type clientInfoResp struct {
Key string `json:"key"`
User string `json:"user"`
ClientID string `json:"clientId"`
RunID string `json:"runId"`
Hostname string `json:"hostname"`
Metas map[string]string `json:"metas,omitempty"`
FirstConnectedAt int64 `json:"firstConnectedAt"`
LastConnectedAt int64 `json:"lastConnectedAt"`
DisconnectedAt int64 `json:"disconnectedAt,omitempty"`
Online bool `json:"online"`
}
// /healthz
func (svr *Service) healthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
@@ -132,6 +150,101 @@ func (svr *Service) apiServerInfo(w http.ResponseWriter, r *http.Request) {
res.Msg = string(buf)
}
// /api/clients
func (svr *Service) apiClientList(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200}
defer func() {
log.Infof("http response [%s]: code [%d]", r.URL.RequestURI(), res.Code)
w.WriteHeader(res.Code)
if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg))
}
}()
log.Infof("http request: [%s]", r.URL.RequestURI())
if svr.clientRegistry == nil {
res.Code = http.StatusInternalServerError
res.Msg = "client registry unavailable"
return
}
query := r.URL.Query()
userFilter := query.Get("user")
clientIDFilter := query.Get("clientId")
runIDFilter := query.Get("runId")
statusFilter := strings.ToLower(query.Get("status"))
records := svr.clientRegistry.List()
items := make([]clientInfoResp, 0, len(records))
for _, info := range records {
if userFilter != "" && info.User != userFilter {
continue
}
if clientIDFilter != "" && info.ClientID != clientIDFilter {
continue
}
if runIDFilter != "" && info.RunID != runIDFilter {
continue
}
if !matchStatusFilter(info.Online, statusFilter) {
continue
}
items = append(items, buildClientInfoResp(info))
}
slices.SortFunc(items, func(a, b clientInfoResp) int {
if v := cmp.Compare(a.User, b.User); v != 0 {
return v
}
if v := cmp.Compare(a.ClientID, b.ClientID); v != 0 {
return v
}
return cmp.Compare(a.Key, b.Key)
})
buf, _ := json.Marshal(items)
res.Msg = string(buf)
}
// /api/clients/{key}
func (svr *Service) apiClientDetail(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200}
defer func() {
log.Infof("http response [%s]: code [%d]", r.URL.RequestURI(), res.Code)
w.WriteHeader(res.Code)
if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg))
}
}()
log.Infof("http request: [%s]", r.URL.RequestURI())
vars := mux.Vars(r)
key := vars["key"]
if key == "" {
res.Code = http.StatusBadRequest
res.Msg = "missing client key"
return
}
if svr.clientRegistry == nil {
res.Code = http.StatusInternalServerError
res.Msg = "client registry unavailable"
return
}
info, ok := svr.clientRegistry.GetByKey(key)
if !ok {
res.Code = http.StatusNotFound
res.Msg = fmt.Sprintf("client %s not found", key)
return
}
buf, _ := json.Marshal(buildClientInfoResp(info))
res.Msg = string(buf)
}
type BaseOutConf struct {
v1.ProxyBaseConfig
}
@@ -404,3 +517,41 @@ func (svr *Service) deleteProxies(w http.ResponseWriter, r *http.Request) {
cleared, total := mem.StatsCollector.ClearOfflineProxies()
log.Infof("cleared [%d] offline proxies, total [%d] proxies", cleared, total)
}
func buildClientInfoResp(info ClientInfo) clientInfoResp {
resp := clientInfoResp{
Key: info.Key,
User: info.User,
ClientID: info.ClientID,
RunID: info.RunID,
Hostname: info.Hostname,
Metas: info.Metas,
FirstConnectedAt: toUnix(info.FirstConnectedAt),
LastConnectedAt: toUnix(info.LastConnectedAt),
Online: info.Online,
}
if !info.DisconnectedAt.IsZero() {
resp.DisconnectedAt = info.DisconnectedAt.Unix()
}
return resp
}
func toUnix(t time.Time) int64 {
if t.IsZero() {
return 0
}
return t.Unix()
}
func matchStatusFilter(online bool, filter string) bool {
switch strings.ToLower(filter) {
case "", "all":
return true
case "online":
return online
case "offline":
return !online
default:
return true
}
}

View File

@@ -96,6 +96,9 @@ type Service struct {
// Manage all controllers
ctlManager *ControlManager
// Track logical clients keyed by user.clientID.
clientRegistry *ClientRegistry
// Manage all proxies
pxyManager *proxy.Manager
@@ -155,9 +158,10 @@ func NewService(cfg *v1.ServerConfig) (*Service, error) {
}
svr := &Service{
ctlManager: NewControlManager(),
pxyManager: proxy.NewManager(),
pluginManager: plugin.NewManager(),
ctlManager: NewControlManager(),
clientRegistry: NewClientRegistry(),
pxyManager: proxy.NewManager(),
pluginManager: plugin.NewManager(),
rc: &controller.ResourceController{
VisitorManager: visitor.NewManager(),
TCPPortManager: ports.NewManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
@@ -606,10 +610,19 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login, inter
// don't return detailed errors to client
return fmt.Errorf("unexpected error when creating new controller")
}
if oldCtl := svr.ctlManager.Add(loginMsg.RunID, ctl); oldCtl != nil {
oldCtl.WaitClosed()
}
_, conflict := svr.clientRegistry.Register(loginMsg.User, loginMsg.ClientID, loginMsg.RunID, loginMsg.Hostname, loginMsg.Metas)
if conflict {
svr.ctlManager.Del(loginMsg.RunID, ctl)
ctl.Close()
return fmt.Errorf("client_id [%s] for user [%s] is already online", loginMsg.ClientID, loginMsg.User)
}
ctl.clientRegistry = svr.clientRegistry
ctl.Start()
// for statistics