mirror of
https://github.com/compute-blade-community/compute-blade-agent.git
synced 2026-04-16 15:35:42 +02:00
* refactor(fancontroller): improve fan controller validation logic and error handling for temperature steps * refactor(agent): restructure gRPC server implementation by moving it to a new api package for better organization and maintainability * feat(agent): implement gRPC server for managing compute blade agents and add graceful shutdown support refactor(agent): restructure agent code by moving API logic to a dedicated file and improving error handling fix(agent): update logging messages for clarity and consistency across the agent's operations chore(agent): remove unused API code and consolidate event handling logic for better maintainability style(agent): improve code formatting and organization for better readability and adherence to conventions * feat(agent): add support for TLS configuration in gRPC server * feat(api): add gRPC server authentication * fix * feat(config): add listen mode configuration to support tcp or unix sockets feat(agent): implement listen mode in gRPC service to allow flexible socket types feat(bladectl): enhance configuration loading and add support for TLS credentials fix(bladectl): improve error handling for gRPC connection and event emission style(logging): change log level from Warn to Info for better clarity in logs * add logging middleware + fixes * fix remote-connection to gRPC API Server debugging the SAN issues took the soul out of me... And then the stupid mistake in cmd_root where I didn't construct the TLS credentials correctly... Oh dear... * cleanup * cleanup * cleanup commands * cleanup * make README.md nicer * Update cmd/agent/main.go Co-authored-by: Matthias Riegler <github@m4tbit.de> * Update cmd/bladectl/cmd_root.go Co-authored-by: Matthias Riegler <github@m4tbit.de> * move bladectl config into correct directory * fix bugs * // FIXME: No dead code * nit: code style * nit(YAGNI): you aint gonna need it. Don't make life harder than it needs to be * nit(YAGNI): you aint gonna need it. Don't make life harder than it needs to be * nit(YAGNI): you aint gonna need it. Don't make life harder than it needs to be * nit(cmd_identify) --------- Co-authored-by: Matthias Riegler <github@m4tbit.de>
103 lines
2.0 KiB
Go
103 lines
2.0 KiB
Go
package events
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
// EventBus is a simple event bus with topic-based publish/subscribe.
|
|
// This is, by no means, a performant or complete implementation but for the scope of this project more than sufficient
|
|
type EventBus interface {
|
|
Publish(topic string, message any)
|
|
Subscribe(topic string, bufSize int, filter func(any) bool) Subscriber
|
|
}
|
|
|
|
type Subscriber interface {
|
|
C() <-chan any
|
|
Unsubscribe()
|
|
}
|
|
|
|
type eventBus struct {
|
|
subscribers map[string]map[*subscriber]func(any) bool
|
|
mu sync.Mutex
|
|
}
|
|
|
|
type subscriber struct {
|
|
mu sync.Mutex
|
|
ch chan any
|
|
closed bool
|
|
}
|
|
|
|
func MatchAll(_ any) bool {
|
|
return true
|
|
}
|
|
|
|
// New returns an initialized EventBus.
|
|
func New() EventBus {
|
|
return &eventBus{
|
|
subscribers: make(map[string]map[*subscriber]func(any) bool),
|
|
}
|
|
}
|
|
|
|
// Publish a message to a topic (best-effort). Subscribers with a full receive queue are dropped.
|
|
func (eb *eventBus) Publish(topic string, message any) {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
if eb.subscribers[topic] == nil {
|
|
return
|
|
}
|
|
|
|
if subs, ok := eb.subscribers[topic]; ok {
|
|
for sub, filter := range subs {
|
|
sub.mu.Lock()
|
|
// Clean up closed subscribers
|
|
if sub.closed {
|
|
delete(eb.subscribers[topic], sub)
|
|
continue
|
|
}
|
|
|
|
if filter(message) {
|
|
// Try to send message, but don't block
|
|
select {
|
|
case sub.ch <- message:
|
|
default:
|
|
}
|
|
}
|
|
|
|
sub.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Subscribe to a topic with a filter function. Returns a channel with given buffer size.
|
|
func (eb *eventBus) Subscribe(topic string, bufSize int, filter func(any) bool) Subscriber {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
ch := make(chan any, bufSize)
|
|
|
|
sub := &subscriber{
|
|
ch: ch,
|
|
closed: false,
|
|
}
|
|
|
|
if _, ok := eb.subscribers[topic]; !ok {
|
|
eb.subscribers[topic] = make(map[*subscriber]func(any) bool)
|
|
}
|
|
|
|
eb.subscribers[topic][sub] = filter
|
|
|
|
return sub
|
|
}
|
|
|
|
func (s *subscriber) C() <-chan any {
|
|
return s.ch
|
|
}
|
|
|
|
func (s *subscriber) Unsubscribe() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
close(s.ch)
|
|
s.closed = true
|
|
}
|