From 791353c03ba81d1c67393a04256a77293307ecad Mon Sep 17 00:00:00 2001
From: zeripath <art27@cantab.net>
Date: Thu, 7 May 2020 22:49:00 +0100
Subject: [PATCH] Add EventSource support (#11235)

If the browser supports EventSource switch to use this instead of
polling notifications.

Signed-off-by: Andrew Thornton art27@cantab.net
---
 custom/conf/app.ini.sample                         |   5 +-
 .../doc/advanced/config-cheat-sheet.en-us.md       |   3 +-
 integrations/eventsource_test.go                   |  78 ++++++++++++++
 models/notification.go                             |  15 +++
 modules/eventsource/event.go                       | 119 +++++++++++++++++++++
 modules/eventsource/event_test.go                  |  54 ++++++++++
 modules/eventsource/manager.go                     |  84 +++++++++++++++
 modules/eventsource/manager_run.go                 |  50 +++++++++
 modules/eventsource/messenger.go                   |  78 ++++++++++++++
 modules/setting/setting.go                         |  21 ++--
 modules/templates/helper.go                        |   7 +-
 routers/events/events.go                           | 112 +++++++++++++++++++
 routers/init.go                                    |   2 +
 routers/routes/routes.go                           |   3 +
 routers/user/auth.go                               |  12 ++-
 templates/base/head.tmpl                           |   1 +
 web_src/js/features/notification.js                |  59 ++++++++--
 17 files changed, 676 insertions(+), 27 deletions(-)
 create mode 100644 integrations/eventsource_test.go
 create mode 100644 modules/eventsource/event.go
 create mode 100644 modules/eventsource/event_test.go
 create mode 100644 modules/eventsource/manager.go
 create mode 100644 modules/eventsource/manager_run.go
 create mode 100644 modules/eventsource/messenger.go
 create mode 100644 routers/events/events.go

diff --git a/custom/conf/app.ini.sample b/custom/conf/app.ini.sample
index 0c29932d92..06b7b96d40 100644
--- a/custom/conf/app.ini.sample
+++ b/custom/conf/app.ini.sample
@@ -202,12 +202,15 @@ DESCRIPTION = Gitea (Git with a cup of tea) is a painless self-hosted Git servic
 KEYWORDS = go,git,self-hosted,gitea
 
 [ui.notification]
-; Control how often notification is queried to update the notification
+; Control how often the notification endpoint is polled to update the notification
 ; The timeout will increase to MAX_TIMEOUT in TIMEOUT_STEPs if the notification count is unchanged
 ; Set MIN_TIMEOUT to 0 to turn off
 MIN_TIMEOUT = 10s
 MAX_TIMEOUT = 60s
 TIMEOUT_STEP = 10s
+; This setting determines how often the db is queried to get the latest notification counts.
+; If the browser client supports EventSource, it will be used in preference to polling notification.
+EVENT_SOURCE_UPDATE_TIME = 10s
 
 [markdown]
 ; Render soft line breaks as hard line breaks, which means a single newline character between
diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
index f9bc05acbb..3ad24776ff 100644
--- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md
+++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md
@@ -144,9 +144,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
 
 ### UI - Notification (`ui.notification`)
 
-- `MIN_TIMEOUT`: **10s**: These options control how often notification is queried to update the notification count. On page load the notification count will be checked after `MIN_TIMEOUT`. The timeout will increase to `MAX_TIMEOUT` by `TIMEOUT_STEP` if the notification count is unchanged. Set MIN_TIMEOUT to 0 to turn off.
+- `MIN_TIMEOUT`: **10s**: These options control how often notification endpoint is polled to update the notification count. On page load the notification count will be checked after `MIN_TIMEOUT`. The timeout will increase to `MAX_TIMEOUT` by `TIMEOUT_STEP` if the notification count is unchanged. Set MIN_TIMEOUT to 0 to turn off.
 - `MAX_TIMEOUT`: **60s**.
 - `TIMEOUT_STEP`: **10s**.
+- `EVENT_SOURCE_UPDATE_TIME`: **10s**: This setting determines how often the database is queried to update notification counts. If the browser client supports `EventSource`, it will be used in preference to polling notification endpoint.
 
 
 ## Markdown (`markdown`)
diff --git a/integrations/eventsource_test.go b/integrations/eventsource_test.go
new file mode 100644
index 0000000000..bc15453147
--- /dev/null
+++ b/integrations/eventsource_test.go
@@ -0,0 +1,78 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package integrations
+
+import (
+	"fmt"
+	"net/http"
+	"testing"
+	"time"
+
+	"code.gitea.io/gitea/models"
+	"code.gitea.io/gitea/modules/eventsource"
+	api "code.gitea.io/gitea/modules/structs"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestEventSourceManagerRun(t *testing.T) {
+	defer prepareTestEnv(t)()
+	manager := eventsource.GetManager()
+
+	eventChan := manager.Register(2)
+	defer func() {
+		manager.Unregister(2, eventChan)
+		// ensure the eventChan is closed
+		for {
+			_, ok := <-eventChan
+			if !ok {
+				break
+			}
+		}
+	}()
+	expectNotificationCountEvent := func(count int64) func() bool {
+		return func() bool {
+			select {
+			case event, ok := <-eventChan:
+				if !ok {
+					return false
+				}
+				data, ok := event.Data.(models.UserIDCount)
+				if !ok {
+					return false
+				}
+				return event.Name == "notification-count" && data.Count == count
+			default:
+				return false
+			}
+		}
+	}
+
+	user2 := models.AssertExistsAndLoadBean(t, &models.User{ID: 2}).(*models.User)
+	repo1 := models.AssertExistsAndLoadBean(t, &models.Repository{ID: 1}).(*models.Repository)
+	thread5 := models.AssertExistsAndLoadBean(t, &models.Notification{ID: 5}).(*models.Notification)
+	assert.NoError(t, thread5.LoadAttributes())
+	session := loginUser(t, user2.Name)
+	token := getTokenForLoggedInUser(t, session)
+
+	var apiNL []api.NotificationThread
+
+	// -- mark notifications as read --
+	req := NewRequest(t, "GET", fmt.Sprintf("/api/v1/notifications?token=%s", token))
+	resp := session.MakeRequest(t, req, http.StatusOK)
+
+	DecodeJSON(t, resp, &apiNL)
+	assert.Len(t, apiNL, 2)
+
+	lastReadAt := "2000-01-01T00%3A50%3A01%2B00%3A00" //946687801 <- only Notification 4 is in this filter ...
+	req = NewRequest(t, "PUT", fmt.Sprintf("/api/v1/repos/%s/%s/notifications?last_read_at=%s&token=%s", user2.Name, repo1.Name, lastReadAt, token))
+	resp = session.MakeRequest(t, req, http.StatusResetContent)
+
+	req = NewRequest(t, "GET", fmt.Sprintf("/api/v1/notifications?token=%s", token))
+	resp = session.MakeRequest(t, req, http.StatusOK)
+	DecodeJSON(t, resp, &apiNL)
+	assert.Len(t, apiNL, 1)
+
+	assert.Eventually(t, expectNotificationCountEvent(1), 30*time.Second, 1*time.Second)
+}
diff --git a/models/notification.go b/models/notification.go
index d0315ab051..1c378a1350 100644
--- a/models/notification.go
+++ b/models/notification.go
@@ -718,6 +718,21 @@ func getNotificationCount(e Engine, user *User, status NotificationStatus) (coun
 	return
 }
 
+// UserIDCount is a simple coalition of UserID and Count
+type UserIDCount struct {
+	UserID int64
+	Count  int64
+}
+
+// GetUIDsAndNotificationCounts between the two provided times
+func GetUIDsAndNotificationCounts(since, until timeutil.TimeStamp) ([]UserIDCount, error) {
+	sql := `SELECT user_id, count(*) AS count FROM notification ` +
+		`WHERE user_id IN (SELECT user_id FROM notification WHERE updated_unix >= ? AND ` +
+		`updated_unix < ?) AND status = ? GROUP BY user_id`
+	var res []UserIDCount
+	return res, x.SQL(sql, since, until, NotificationStatusUnread).Find(&res)
+}
+
 func setNotificationStatusReadIfUnread(e Engine, userID, issueID int64) error {
 	notification, err := getIssueNotification(e, userID, issueID)
 	// ignore if not exists
diff --git a/modules/eventsource/event.go b/modules/eventsource/event.go
new file mode 100644
index 0000000000..fd418c6f07
--- /dev/null
+++ b/modules/eventsource/event.go
@@ -0,0 +1,119 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"strings"
+	"time"
+)
+
+func wrapNewlines(w io.Writer, prefix []byte, value []byte) (sum int64, err error) {
+	if len(value) == 0 {
+		return
+	}
+	n := 0
+	last := 0
+	for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') {
+		n, err = w.Write(prefix)
+		sum += int64(n)
+		if err != nil {
+			return
+		}
+		n, err = w.Write(value[last : last+j+1])
+		sum += int64(n)
+		if err != nil {
+			return
+		}
+		last += j + 1
+	}
+	n, err = w.Write(prefix)
+	sum += int64(n)
+	if err != nil {
+		return
+	}
+	n, err = w.Write(value[last:])
+	sum += int64(n)
+	if err != nil {
+		return
+	}
+	n, err = w.Write([]byte("\n"))
+	sum += int64(n)
+	return
+}
+
+// Event is an eventsource event, not all fields need to be set
+type Event struct {
+	// Name represents the value of the event: tag in the stream
+	Name string
+	// Data is either JSONified []byte or interface{} that can be JSONd
+	Data interface{}
+	// ID represents the ID of an event
+	ID string
+	// Retry tells the receiver only to attempt to reconnect to the source after this time
+	Retry time.Duration
+}
+
+// WriteTo writes data to w until there's no more data to write or when an error occurs.
+// The return value n is the number of bytes written. Any error encountered during the write is also returned.
+func (e *Event) WriteTo(w io.Writer) (int64, error) {
+	sum := int64(0)
+	nint := 0
+	n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name))
+	sum += n
+	if err != nil {
+		return sum, err
+	}
+
+	if e.Data != nil {
+		var data []byte
+		switch v := e.Data.(type) {
+		case []byte:
+			data = v
+		case string:
+			data = []byte(v)
+		default:
+			var err error
+			data, err = json.Marshal(e.Data)
+			if err != nil {
+				return sum, err
+			}
+		}
+		n, err := wrapNewlines(w, []byte("data: "), data)
+		sum += n
+		if err != nil {
+			return sum, err
+		}
+
+	}
+
+	n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID))
+	sum += n
+	if err != nil {
+		return sum, err
+	}
+
+	if e.Retry != 0 {
+		nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond))
+		sum += int64(nint)
+		if err != nil {
+			return sum, err
+		}
+	}
+
+	nint, err = w.Write([]byte("\n"))
+	sum += int64(nint)
+
+	return sum, err
+}
+
+func (e *Event) String() string {
+	buf := new(strings.Builder)
+	_, _ = e.WriteTo(buf)
+	return buf.String()
+}
diff --git a/modules/eventsource/event_test.go b/modules/eventsource/event_test.go
new file mode 100644
index 0000000000..a80e062f0e
--- /dev/null
+++ b/modules/eventsource/event_test.go
@@ -0,0 +1,54 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+	"bytes"
+	"testing"
+)
+
+func Test_wrapNewlines(t *testing.T) {
+	tests := []struct {
+		name   string
+		prefix string
+		value  string
+		output string
+	}{
+		{
+			"check no new lines",
+			"prefix: ",
+			"value",
+			"prefix: value\n",
+		},
+		{
+			"check simple newline",
+			"prefix: ",
+			"value1\nvalue2",
+			"prefix: value1\nprefix: value2\n",
+		},
+		{
+			"check pathological newlines",
+			"p: ",
+			"\n1\n\n2\n3\n",
+			"p: \np: 1\np: \np: 2\np: 3\np: \n",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &bytes.Buffer{}
+			gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value))
+			if err != nil {
+				t.Errorf("wrapNewlines() error = %v", err)
+				return
+			}
+			if gotSum != int64(len(tt.output)) {
+				t.Errorf("wrapNewlines() = %v, want %v", gotSum, int64(len(tt.output)))
+			}
+			if gotW := w.String(); gotW != tt.output {
+				t.Errorf("wrapNewlines() = %v, want %v", gotW, tt.output)
+			}
+		})
+	}
+}
diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go
new file mode 100644
index 0000000000..212fe60569
--- /dev/null
+++ b/modules/eventsource/manager.go
@@ -0,0 +1,84 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+	"sync"
+)
+
+// Manager manages the eventsource Messengers
+type Manager struct {
+	mutex sync.Mutex
+
+	messengers map[int64]*Messenger
+}
+
+var manager *Manager
+
+func init() {
+	manager = &Manager{
+		messengers: make(map[int64]*Messenger),
+	}
+}
+
+// GetManager returns a Manager and initializes one as singleton if there's none yet
+func GetManager() *Manager {
+	return manager
+}
+
+// Register message channel
+func (m *Manager) Register(uid int64) <-chan *Event {
+	m.mutex.Lock()
+	messenger, ok := m.messengers[uid]
+	if !ok {
+		messenger = NewMessenger(uid)
+		m.messengers[uid] = messenger
+	}
+	m.mutex.Unlock()
+	return messenger.Register()
+}
+
+// Unregister message channel
+func (m *Manager) Unregister(uid int64, channel <-chan *Event) {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+	messenger, ok := m.messengers[uid]
+	if !ok {
+		return
+	}
+	if messenger.Unregister(channel) {
+		delete(m.messengers, uid)
+	}
+}
+
+// UnregisterAll message channels
+func (m *Manager) UnregisterAll() {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+	for _, messenger := range m.messengers {
+		messenger.UnregisterAll()
+	}
+	m.messengers = map[int64]*Messenger{}
+}
+
+// SendMessage sends a message to a particular user
+func (m *Manager) SendMessage(uid int64, message *Event) {
+	m.mutex.Lock()
+	messenger, ok := m.messengers[uid]
+	m.mutex.Unlock()
+	if ok {
+		messenger.SendMessage(message)
+	}
+}
+
+// SendMessageBlocking sends a message to a particular user
+func (m *Manager) SendMessageBlocking(uid int64, message *Event) {
+	m.mutex.Lock()
+	messenger, ok := m.messengers[uid]
+	m.mutex.Unlock()
+	if ok {
+		messenger.SendMessageBlocking(message)
+	}
+}
diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go
new file mode 100644
index 0000000000..75d3ee5b01
--- /dev/null
+++ b/modules/eventsource/manager_run.go
@@ -0,0 +1,50 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import (
+	"context"
+	"time"
+
+	"code.gitea.io/gitea/models"
+	"code.gitea.io/gitea/modules/graceful"
+	"code.gitea.io/gitea/modules/log"
+	"code.gitea.io/gitea/modules/setting"
+	"code.gitea.io/gitea/modules/timeutil"
+)
+
+// Init starts this eventsource
+func (m *Manager) Init() {
+	go graceful.GetManager().RunWithShutdownContext(m.Run)
+}
+
+// Run runs the manager within a provided context
+func (m *Manager) Run(ctx context.Context) {
+	then := timeutil.TimeStampNow().Add(-2)
+	timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
+loop:
+	for {
+		select {
+		case <-ctx.Done():
+			timer.Stop()
+			break loop
+		case <-timer.C:
+			now := timeutil.TimeStampNow().Add(-2)
+
+			uidCounts, err := models.GetUIDsAndNotificationCounts(then, now)
+			if err != nil {
+				log.Error("Unable to get UIDcounts: %v", err)
+			}
+			for _, uidCount := range uidCounts {
+				m.SendMessage(uidCount.UserID, &Event{
+					Name: "notification-count",
+					Data: uidCount,
+				})
+			}
+			then = now
+		}
+	}
+	m.UnregisterAll()
+}
diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go
new file mode 100644
index 0000000000..091e1a5c1c
--- /dev/null
+++ b/modules/eventsource/messenger.go
@@ -0,0 +1,78 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package eventsource
+
+import "sync"
+
+// Messenger is a per uid message store
+type Messenger struct {
+	mutex    sync.Mutex
+	uid      int64
+	channels []chan *Event
+}
+
+// NewMessenger creates a messenger for a particular uid
+func NewMessenger(uid int64) *Messenger {
+	return &Messenger{
+		uid:      uid,
+		channels: [](chan *Event){},
+	}
+}
+
+// Register returns a new chan []byte
+func (m *Messenger) Register() <-chan *Event {
+	m.mutex.Lock()
+	// TODO: Limit the number of messengers per uid
+	channel := make(chan *Event, 1)
+	m.channels = append(m.channels, channel)
+	m.mutex.Unlock()
+	return channel
+}
+
+// Unregister removes the provider chan []byte
+func (m *Messenger) Unregister(channel <-chan *Event) bool {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+	for i, toRemove := range m.channels {
+		if channel == toRemove {
+			m.channels = append(m.channels[:i], m.channels[i+1:]...)
+			close(toRemove)
+			break
+		}
+	}
+	return len(m.channels) == 0
+}
+
+// UnregisterAll removes all chan []byte
+func (m *Messenger) UnregisterAll() {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+	for _, channel := range m.channels {
+		close(channel)
+	}
+	m.channels = nil
+}
+
+// SendMessage sends the message to all registered channels
+func (m *Messenger) SendMessage(message *Event) {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+	for i := range m.channels {
+		channel := m.channels[i]
+		select {
+		case channel <- message:
+		default:
+		}
+	}
+}
+
+// SendMessageBlocking sends the message to all registered channels and ensures it gets sent
+func (m *Messenger) SendMessageBlocking(message *Event) {
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+	for i := range m.channels {
+		m.channels[i] <- message
+	}
+}
diff --git a/modules/setting/setting.go b/modules/setting/setting.go
index bf2ed6111e..de0f5cbad2 100644
--- a/modules/setting/setting.go
+++ b/modules/setting/setting.go
@@ -182,9 +182,10 @@ var (
 		UseServiceWorker      bool
 
 		Notification struct {
-			MinTimeout  time.Duration
-			TimeoutStep time.Duration
-			MaxTimeout  time.Duration
+			MinTimeout            time.Duration
+			TimeoutStep           time.Duration
+			MaxTimeout            time.Duration
+			EventSourceUpdateTime time.Duration
 		} `ini:"ui.notification"`
 
 		Admin struct {
@@ -216,13 +217,15 @@ var (
 		Themes:              []string{`gitea`, `arc-green`},
 		Reactions:           []string{`+1`, `-1`, `laugh`, `hooray`, `confused`, `heart`, `rocket`, `eyes`},
 		Notification: struct {
-			MinTimeout  time.Duration
-			TimeoutStep time.Duration
-			MaxTimeout  time.Duration
+			MinTimeout            time.Duration
+			TimeoutStep           time.Duration
+			MaxTimeout            time.Duration
+			EventSourceUpdateTime time.Duration
 		}{
-			MinTimeout:  10 * time.Second,
-			TimeoutStep: 10 * time.Second,
-			MaxTimeout:  60 * time.Second,
+			MinTimeout:            10 * time.Second,
+			TimeoutStep:           10 * time.Second,
+			MaxTimeout:            60 * time.Second,
+			EventSourceUpdateTime: 10 * time.Second,
 		},
 		Admin: struct {
 			UserPagingNum   int
diff --git a/modules/templates/helper.go b/modules/templates/helper.go
index 7827b3d0f3..374f13af0f 100644
--- a/modules/templates/helper.go
+++ b/modules/templates/helper.go
@@ -284,9 +284,10 @@ func NewFuncMap() []template.FuncMap {
 		},
 		"NotificationSettings": func() map[string]int {
 			return map[string]int{
-				"MinTimeout":  int(setting.UI.Notification.MinTimeout / time.Millisecond),
-				"TimeoutStep": int(setting.UI.Notification.TimeoutStep / time.Millisecond),
-				"MaxTimeout":  int(setting.UI.Notification.MaxTimeout / time.Millisecond),
+				"MinTimeout":            int(setting.UI.Notification.MinTimeout / time.Millisecond),
+				"TimeoutStep":           int(setting.UI.Notification.TimeoutStep / time.Millisecond),
+				"MaxTimeout":            int(setting.UI.Notification.MaxTimeout / time.Millisecond),
+				"EventSourceUpdateTime": int(setting.UI.Notification.EventSourceUpdateTime / time.Millisecond),
 			}
 		},
 		"contain": func(s []int64, id int64) bool {
diff --git a/routers/events/events.go b/routers/events/events.go
new file mode 100644
index 0000000000..a1131f29e3
--- /dev/null
+++ b/routers/events/events.go
@@ -0,0 +1,112 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package events
+
+import (
+	"net/http"
+	"time"
+
+	"code.gitea.io/gitea/modules/context"
+	"code.gitea.io/gitea/modules/eventsource"
+	"code.gitea.io/gitea/modules/graceful"
+	"code.gitea.io/gitea/modules/log"
+	"code.gitea.io/gitea/routers/user"
+)
+
+// Events listens for events
+func Events(ctx *context.Context) {
+	// FIXME: Need to check if resp is actually a http.Flusher! - how though?
+
+	// Set the headers related to event streaming.
+	ctx.Resp.Header().Set("Content-Type", "text/event-stream")
+	ctx.Resp.Header().Set("Cache-Control", "no-cache")
+	ctx.Resp.Header().Set("Connection", "keep-alive")
+	ctx.Resp.Header().Set("X-Accel-Buffering", "no")
+	ctx.Resp.WriteHeader(http.StatusOK)
+
+	// Listen to connection close and un-register messageChan
+	notify := ctx.Req.Context().Done()
+	ctx.Resp.Flush()
+
+	shutdownCtx := graceful.GetManager().ShutdownContext()
+
+	uid := ctx.User.ID
+
+	messageChan := eventsource.GetManager().Register(uid)
+
+	unregister := func() {
+		eventsource.GetManager().Unregister(uid, messageChan)
+		// ensure the messageChan is closed
+		for {
+			_, ok := <-messageChan
+			if !ok {
+				break
+			}
+		}
+	}
+
+	if _, err := ctx.Resp.Write([]byte("\n")); err != nil {
+		log.Error("Unable to write to EventStream: %v", err)
+		unregister()
+		return
+	}
+
+	timer := time.NewTicker(30 * time.Second)
+
+loop:
+	for {
+		select {
+		case <-timer.C:
+			event := &eventsource.Event{
+				Name: "ping",
+			}
+			_, err := event.WriteTo(ctx.Resp)
+			if err != nil {
+				log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err)
+				go unregister()
+				break loop
+			}
+			ctx.Resp.Flush()
+		case <-notify:
+			go unregister()
+			break loop
+		case <-shutdownCtx.Done():
+			go unregister()
+			break loop
+		case event, ok := <-messageChan:
+			if !ok {
+				break loop
+			}
+
+			// Handle logout
+			if event.Name == "logout" {
+				if ctx.Session.ID() == event.Data {
+					_, _ = (&eventsource.Event{
+						Name: "logout",
+						Data: "here",
+					}).WriteTo(ctx.Resp)
+					ctx.Resp.Flush()
+					go unregister()
+					user.HandleSignOut(ctx)
+					break loop
+				}
+				// Replace the event - we don't want to expose the session ID to the user
+				event = (&eventsource.Event{
+					Name: "logout",
+					Data: "elsewhere",
+				})
+			}
+
+			_, err := event.WriteTo(ctx.Resp)
+			if err != nil {
+				log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err)
+				go unregister()
+				break loop
+			}
+			ctx.Resp.Flush()
+		}
+	}
+	timer.Stop()
+}
diff --git a/routers/init.go b/routers/init.go
index 724bf84c10..72f55c809c 100644
--- a/routers/init.go
+++ b/routers/init.go
@@ -15,6 +15,7 @@ import (
 	"code.gitea.io/gitea/modules/auth/sso"
 	"code.gitea.io/gitea/modules/cache"
 	"code.gitea.io/gitea/modules/cron"
+	"code.gitea.io/gitea/modules/eventsource"
 	"code.gitea.io/gitea/modules/git"
 	"code.gitea.io/gitea/modules/highlight"
 	code_indexer "code.gitea.io/gitea/modules/indexer/code"
@@ -123,6 +124,7 @@ func GlobalInit(ctx context.Context) {
 		if err := task.Init(); err != nil {
 			log.Fatal("Failed to initialize task scheduler: %v", err)
 		}
+		eventsource.GetManager().Init()
 	}
 	if setting.EnableSQLite3 {
 		log.Info("SQLite3 Supported")
diff --git a/routers/routes/routes.go b/routers/routes/routes.go
index ac0f3f4f14..a51c1e17af 100644
--- a/routers/routes/routes.go
+++ b/routers/routes/routes.go
@@ -27,6 +27,7 @@ import (
 	"code.gitea.io/gitea/routers/admin"
 	apiv1 "code.gitea.io/gitea/routers/api/v1"
 	"code.gitea.io/gitea/routers/dev"
+	"code.gitea.io/gitea/routers/events"
 	"code.gitea.io/gitea/routers/org"
 	"code.gitea.io/gitea/routers/private"
 	"code.gitea.io/gitea/routers/repo"
@@ -340,6 +341,8 @@ func RegisterRoutes(m *macaron.Macaron) {
 		})
 	}, reqSignOut)
 
+	m.Any("/user/events", reqSignIn, events.Events)
+
 	m.Group("/login/oauth", func() {
 		m.Get("/authorize", bindIgnErr(auth.AuthorizationForm{}), user.AuthorizeOAuth)
 		m.Post("/grant", bindIgnErr(auth.GrantApplicationForm{}), user.GrantApplicationOAuth)
diff --git a/routers/user/auth.go b/routers/user/auth.go
index 169f3c453d..a8ff92ae59 100644
--- a/routers/user/auth.go
+++ b/routers/user/auth.go
@@ -16,6 +16,7 @@ import (
 	"code.gitea.io/gitea/modules/auth/oauth2"
 	"code.gitea.io/gitea/modules/base"
 	"code.gitea.io/gitea/modules/context"
+	"code.gitea.io/gitea/modules/eventsource"
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/password"
 	"code.gitea.io/gitea/modules/recaptcha"
@@ -991,7 +992,8 @@ func LinkAccountPostRegister(ctx *context.Context, cpt *captcha.Captcha, form au
 	ctx.Redirect(setting.AppSubURL + "/user/login")
 }
 
-func handleSignOut(ctx *context.Context) {
+// HandleSignOut resets the session and sets the cookies
+func HandleSignOut(ctx *context.Context) {
 	_ = ctx.Session.Delete("uid")
 	_ = ctx.Session.Delete("uname")
 	_ = ctx.Session.Delete("socialId")
@@ -1006,7 +1008,13 @@ func handleSignOut(ctx *context.Context) {
 
 // SignOut sign out from login status
 func SignOut(ctx *context.Context) {
-	handleSignOut(ctx)
+	if ctx.User != nil {
+		eventsource.GetManager().SendMessageBlocking(ctx.User.ID, &eventsource.Event{
+			Name: "logout",
+			Data: ctx.Session.ID(),
+		})
+	}
+	HandleSignOut(ctx)
 	ctx.Redirect(setting.AppSubURL + "/")
 }
 
diff --git a/templates/base/head.tmpl b/templates/base/head.tmpl
index 0ecf6821c3..251b5eb8fc 100644
--- a/templates/base/head.tmpl
+++ b/templates/base/head.tmpl
@@ -98,6 +98,7 @@
 				MinTimeout: {{NotificationSettings.MinTimeout}},
 				TimeoutStep:  {{NotificationSettings.TimeoutStep}},
 				MaxTimeout: {{NotificationSettings.MaxTimeout}},
+				EventSourceUpdateTime: {{NotificationSettings.EventSourceUpdateTime}},
 			},
       {{if .RequireTribute}}
 			tributeValues: [
diff --git a/web_src/js/features/notification.js b/web_src/js/features/notification.js
index 3f2af4de91..8b843e9806 100644
--- a/web_src/js/features/notification.js
+++ b/web_src/js/features/notification.js
@@ -19,21 +19,53 @@ export function initNotificationsTable() {
 }
 
 export function initNotificationCount() {
-  if (NotificationSettings.MinTimeout <= 0) {
+  const notificationCount = $('.notification_count');
+
+  if (!notificationCount.length) {
     return;
   }
 
-  const notificationCount = $('.notification_count');
-
-  if (notificationCount.length > 0) {
-    const fn = (timeout, lastCount) => {
-      setTimeout(async () => {
-        await updateNotificationCountWithCallback(fn, timeout, lastCount);
-      }, timeout);
-    };
+  if (NotificationSettings.EventSourceUpdateTime > 0 && !!window.EventSource) {
+    // Try to connect to the event source first
+    const source = new EventSource(`${AppSubUrl}/user/events`);
+    source.addEventListener('notification-count', async (e) => {
+      try {
+        const data = JSON.parse(e.data);
+
+        const notificationCount = $('.notification_count');
+        if (data.Count === 0) {
+          notificationCount.addClass('hidden');
+        } else {
+          notificationCount.removeClass('hidden');
+        }
+
+        notificationCount.text(`${data.Count}`);
+        await updateNotificationTable();
+      } catch (error) {
+        console.error(error);
+      }
+    });
+    source.addEventListener('logout', async (e) => {
+      if (e.data !== 'here') {
+        return;
+      }
+      source.close();
+      window.location.href = AppSubUrl;
+    });
+    return;
+  }
 
-    fn(NotificationSettings.MinTimeout, notificationCount.text());
+  if (NotificationSettings.MinTimeout <= 0) {
+    return;
   }
+
+  const fn = (timeout, lastCount) => {
+    setTimeout(async () => {
+      await updateNotificationCountWithCallback(fn, timeout, lastCount);
+    }, timeout);
+  };
+
+  fn(NotificationSettings.MinTimeout, notificationCount.text());
 }
 
 async function updateNotificationCountWithCallback(callback, timeout, lastCount) {
@@ -54,9 +86,14 @@ async function updateNotificationCountWithCallback(callback, timeout, lastCount)
   }
 
   callback(timeout, newCount);
+  if (needsUpdate) {
+    await updateNotificationTable();
+  }
+}
 
+async function updateNotificationTable() {
   const notificationDiv = $('#notification_div');
-  if (notificationDiv.length > 0 && needsUpdate) {
+  if (notificationDiv.length > 0) {
     const data = await $.ajax({
       type: 'GET',
       url: `${AppSubUrl}/notifications?${notificationDiv.data('params')}`,