Files
compute-blade-agent/pkg/events/eventbus.go
Cedric Kienzler 70541d86ba feat(agent)!: add support for mTLS authentication in gRPC server (#54)
* refactor(fancontroller): improve fan controller validation logic and error handling for temperature steps

* refactor(agent): restructure gRPC server implementation by moving it to a new api package for better organization and maintainability

* feat(agent): implement gRPC server for managing compute blade agents and add graceful shutdown support
refactor(agent): restructure agent code by moving API logic to a dedicated file and improving error handling
fix(agent): update logging messages for clarity and consistency across the agent's operations
chore(agent): remove unused API code and consolidate event handling logic for better maintainability
style(agent): improve code formatting and organization for better readability and adherence to conventions

* feat(agent): add support for TLS configuration in gRPC server

* feat(api): add gRPC server authentication

* fix

* feat(config): add listen mode configuration to support tcp or unix sockets
feat(agent): implement listen mode in gRPC service to allow flexible socket types
feat(bladectl): enhance configuration loading and add support for TLS credentials
fix(bladectl): improve error handling for gRPC connection and event emission
style(logging): change log level from Warn to Info for better clarity in logs

* add logging middleware + fixes

* fix remote-connection to gRPC API Server

debugging the SAN issues took the soul out of me... And then the stupid
mistake in cmd_root where I didn't construct the TLS credentials
correctly... Oh dear...

* cleanup

* cleanup

* cleanup commands

* cleanup

* make README.md nicer

* Update cmd/agent/main.go

Co-authored-by: Matthias Riegler <github@m4tbit.de>

* Update cmd/bladectl/cmd_root.go

Co-authored-by: Matthias Riegler <github@m4tbit.de>

* move bladectl config into correct directory

* fix bugs

* // FIXME: No dead code

* nit: code style

* nit(YAGNI): you aint gonna need it. Don't make life harder than it needs to be

* nit(YAGNI): you aint gonna need it. Don't make life harder than it needs to be

* nit(YAGNI): you aint gonna need it. Don't make life harder than it needs to be

* nit(cmd_identify)

---------

Co-authored-by: Matthias Riegler <github@m4tbit.de>
2025-05-12 00:00:55 +02:00

103 lines
2.0 KiB
Go

package events
import (
"sync"
)
// EventBus is a simple event bus with topic-based publish/subscribe.
// This is, by no means, a performant or complete implementation but for the scope of this project more than sufficient
type EventBus interface {
Publish(topic string, message any)
Subscribe(topic string, bufSize int, filter func(any) bool) Subscriber
}
type Subscriber interface {
C() <-chan any
Unsubscribe()
}
type eventBus struct {
subscribers map[string]map[*subscriber]func(any) bool
mu sync.Mutex
}
type subscriber struct {
mu sync.Mutex
ch chan any
closed bool
}
func MatchAll(_ any) bool {
return true
}
// New returns an initialized EventBus.
func New() EventBus {
return &eventBus{
subscribers: make(map[string]map[*subscriber]func(any) bool),
}
}
// Publish a message to a topic (best-effort). Subscribers with a full receive queue are dropped.
func (eb *eventBus) Publish(topic string, message any) {
eb.mu.Lock()
defer eb.mu.Unlock()
if eb.subscribers[topic] == nil {
return
}
if subs, ok := eb.subscribers[topic]; ok {
for sub, filter := range subs {
sub.mu.Lock()
// Clean up closed subscribers
if sub.closed {
delete(eb.subscribers[topic], sub)
continue
}
if filter(message) {
// Try to send message, but don't block
select {
case sub.ch <- message:
default:
}
}
sub.mu.Unlock()
}
}
}
// Subscribe to a topic with a filter function. Returns a channel with given buffer size.
func (eb *eventBus) Subscribe(topic string, bufSize int, filter func(any) bool) Subscriber {
eb.mu.Lock()
defer eb.mu.Unlock()
ch := make(chan any, bufSize)
sub := &subscriber{
ch: ch,
closed: false,
}
if _, ok := eb.subscribers[topic]; !ok {
eb.subscribers[topic] = make(map[*subscriber]func(any) bool)
}
eb.subscribers[topic][sub] = filter
return sub
}
func (s *subscriber) C() <-chan any {
return s.ch
}
func (s *subscriber) Unsubscribe() {
s.mu.Lock()
defer s.mu.Unlock()
close(s.ch)
s.closed = true
}