Skip to content
This repository was archived by the owner on Jun 4, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion event-handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (a *App) Run(ctx context.Context) error {
a.SpawnCriticalJob(a.sessionEventsJob)
<-a.Process.Done()

lastWindow := a.EventWatcher.getWindowStartTime()
a.State.SetLastWindowTime(&lastWindow)

return a.Err()
}

Expand Down Expand Up @@ -179,7 +182,18 @@ func (a *App) init(ctx context.Context) error {
return trace.Wrap(err)
}

t, err := NewTeleportEventsWatcher(ctx, a.Config, *startTime, latestCursor, latestID)
lastWindowTime, err := s.GetLastWindowTime()
if err != nil {
return trace.Wrap(err)
}
// if lastWindowTime is nil, set it to startTime
// lastWindowTime is used to track the last window of events ingested
// and is updated on exit
if lastWindowTime == nil {
lastWindowTime = startTime
}

t, err := NewTeleportEventsWatcher(ctx, a.Config, *lastWindowTime, latestCursor, latestID)
if err != nil {
return trace.Wrap(err)
}
Expand Down
33 changes: 27 additions & 6 deletions event-handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
// startTimeName is the start time variable name
startTimeName = "start_time"

// windowTimeName is the start time of the last window.
windowTimeName = "window_time"

// cursorName is the cursor variable name
cursorName = "cursor"

Expand Down Expand Up @@ -120,11 +123,25 @@ func createStorageDir(c *StartCmdConfig) (string, error) {

// GetStartTime gets current start time
func (s *State) GetStartTime() (*time.Time, error) {
if !s.dv.Has(startTimeName) {
return s.getTimeKey(startTimeName)
}

// SetStartTime sets current start time
func (s *State) SetStartTime(t *time.Time) error {
return s.setTimeKey(startTimeName, t)
}

// GetLastWindowTime gets current start time
func (s *State) GetLastWindowTime() (*time.Time, error) {
return s.getTimeKey(windowTimeName)
}

func (s *State) getTimeKey(keyName string) (*time.Time, error) {
if !s.dv.Has(keyName) {
return nil, nil
}

b, err := s.dv.Read(startTimeName)
b, err := s.dv.Read(keyName)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -144,14 +161,18 @@ func (s *State) GetStartTime() (*time.Time, error) {
return &t, nil
}

// SetStartTime sets current start time
func (s *State) SetStartTime(t *time.Time) error {
func (s *State) setTimeKey(keyName string, t *time.Time) error {
if t == nil {
return s.dv.Write(startTimeName, []byte(""))
return s.dv.Write(keyName, []byte(""))
}

v := t.Truncate(time.Second).Format(time.RFC3339)
return s.dv.Write(startTimeName, []byte(v))
return s.dv.Write(keyName, []byte(v))
}

// SetLastWindowTime sets current start time of the last window used.
func (s *State) SetLastWindowTime(t *time.Time) error {
return s.setTimeKey(windowTimeName, t)
}

// GetCursor gets current cursor value
Expand Down
153 changes: 117 additions & 36 deletions event-handler/teleport_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"fmt"
"sync"
"time"

"github.com/gravitational/teleport/api/client"
Expand Down Expand Up @@ -69,15 +70,17 @@ type TeleportEventsWatcher struct {
batch []*TeleportEvent
// config is teleport config
config *StartCmdConfig
// startTime is event time frame start
startTime time.Time

// windowStartTime is event time frame start
windowStartTime time.Time
windowStartTimeMu sync.Mutex
}

// NewTeleportEventsWatcher builds Teleport client instance
func NewTeleportEventsWatcher(
ctx context.Context,
c *StartCmdConfig,
startTime time.Time,
windowStartTime time.Time,
cursor string,
id string,
) (*TeleportEventsWatcher, error) {
Expand Down Expand Up @@ -118,12 +121,12 @@ func NewTeleportEventsWatcher(
}

tc := TeleportEventsWatcher{
client: teleportClient,
pos: -1,
cursor: cursor,
config: c,
id: id,
startTime: startTime,
client: teleportClient,
pos: -1,
cursor: cursor,
config: c,
id: id,
windowStartTime: windowStartTime,
}

return &tc, nil
Expand All @@ -150,43 +153,29 @@ func (t *TeleportEventsWatcher) flipPage() bool {
// fetch fetches the page and sets the position to the event after latest known
func (t *TeleportEventsWatcher) fetch(ctx context.Context) error {
log := logger.Get(ctx)
b, nextCursor, err := t.getEvents(ctx)
// Zero batch
t.batch = make([]*TeleportEvent, 0, t.config.BatchSize)
nextCursor, err := t.getEvents(ctx)
if err != nil {
return trace.Wrap(err)
}

// Zero batch
t.batch = make([]*TeleportEvent, 0, len(b))

// Save next cursor
t.nextCursor = nextCursor

// Mark position as unresolved (the page is empty)
t.pos = -1

log.WithField("cursor", t.cursor).WithField("next", nextCursor).WithField("len", len(b)).Debug("Fetched page")
log.WithField("cursor", t.cursor).WithField("next", nextCursor).WithField("len", len(t.batch)).Debug("Fetched page")

// Page is empty: do nothing, return
if len(b) == 0 {
if len(t.batch) == 0 {
t.pos = 0
return nil
}

pos := 0

// Convert batch to TeleportEvent
for _, e := range b {
if _, ok := t.config.SkipEventTypes[e.Type]; ok {
log.WithField("event", e).Debug("Skipping event")
continue
}
evt, err := NewTeleportEvent(e, t.cursor)
if err != nil {
return trace.Wrap(err)
}

t.batch = append(t.batch, evt)
}

// If last known id is not empty, let's try to find it's pos
if t.id != "" {
for i, e := range t.batch {
Expand All @@ -205,18 +194,98 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error {
return nil
}

// getEvents calls Teleport client and loads events
func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) {
return t.client.SearchUnstructuredEvents(
// getEvents iterates over the range of days between the last windowStartTime and now.
// It returns a slice of events, a cursor for the next page and an error.
// If the cursor is out of the range, it advances the windowStartTime to the next day.
// It only advances the windowStartTime if no events are found until the last complete day.
func (t *TeleportEventsWatcher) getEvents(ctx context.Context) (string, error) {
rangeSplitByDay := splitRangeByDay(t.getWindowStartTime(), time.Now().UTC())
for i := 1; i < len(rangeSplitByDay); i++ {
startTime := rangeSplitByDay[i-1]
endTime := rangeSplitByDay[i]
log.Debugf("Fetching events from %v to %v", startTime, endTime)
evts, cursor, err := t.getEventsInWindow(ctx, startTime, endTime)
if err != nil {
return "", trace.Wrap(err)
}

// Convert batch to TeleportEvent
for _, e := range evts {
if _, ok := t.config.SkipEventTypes[e.Type]; ok {
log.WithField("event", e).Debug("Skipping event")
continue
}
evt, err := NewTeleportEvent(e, t.cursor)
if err != nil {
return "", trace.Wrap(err)
}

t.batch = append(t.batch, evt)
}

// if no events are found, the cursor is out of the range [startTime, endTime]
// and it's the last complete day, update start time to the next day.
if t.canSkipToNextWindow(i, rangeSplitByDay, cursor) {
log.Infof("No new events found for the range %v to %v", startTime, endTime)
t.setWindowStartTime(endTime)
continue
}
// if any events are found, return them
return cursor, nil
}
return t.cursor, nil
}

func (t *TeleportEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time.Time, cursor string) bool {
if cursor != "" {
return false

}
if len(t.batch) == 0 && i < len(rangeSplitByDay)-1 {
log.Infof("No events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i])
return true
}
pos := 0
// If last known id is not empty, let's try to find if all events are already processed
// and if we can skip to next page
if t.id != "" {
for i, e := range t.batch {
if e.ID == t.id {
pos = i + 1
}
}
}

if i < len(rangeSplitByDay)-1 && pos >= len(t.batch) {
log.WithField("pos", pos).WithField("len", len(t.batch)).Infof("No new events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i])
return true
}
return false
}

// getEvents calls Teleport client and loads events from the audit log.
// It returns a slice of events, a cursor for the next page and an error.
func (t *TeleportEventsWatcher) getEventsInWindow(ctx context.Context, from, to time.Time) ([]*auditlogpb.EventUnstructured, string, error) {
evts, cursor, err := t.client.SearchUnstructuredEvents(
ctx,
t.startTime,
time.Now().UTC(),
from,
to,
"default",
t.config.Types,
t.config.BatchSize,
types.EventOrderAscending,
t.cursor,
)
return evts, cursor, trace.Wrap(err)
}

func splitRangeByDay(from, to time.Time) []time.Time {
// splitRangeByDay splits the range into days
var days []time.Time
for d := from; d.Before(to); d = d.AddDate(0, 0, 1) {
days = append(days, d)
}
return append(days, to) // add the last date
}

// pause sleeps for timeout seconds
Expand Down Expand Up @@ -251,7 +320,7 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent
}

// If there is still nothing, sleep
if len(t.batch) == 0 {
if len(t.batch) == 0 && t.nextCursor == "" {
if t.config.ExitOnLastEvent {
log.Info("All events are processed, exiting...")
break
Expand Down Expand Up @@ -283,7 +352,7 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent

// If there is still nothing new on current page, sleep
if t.pos >= len(t.batch) {
if t.config.ExitOnLastEvent {
if t.config.ExitOnLastEvent && t.nextCursor == "" {
log.Info("All events are processed, exiting...")
break
}
Expand Down Expand Up @@ -345,3 +414,15 @@ func (t *TeleportEventsWatcher) UpsertLock(ctx context.Context, user string, log

return t.client.UpsertLock(ctx, lock)
}

func (t *TeleportEventsWatcher) getWindowStartTime() time.Time {
t.windowStartTimeMu.Lock()
defer t.windowStartTimeMu.Unlock()
return t.windowStartTime
}

func (t *TeleportEventsWatcher) setWindowStartTime(time time.Time) {
t.windowStartTimeMu.Lock()
defer t.windowStartTimeMu.Unlock()
t.windowStartTime = time
}
Loading