Skip to content

Commit cafa14f

Browse files
committed
Add pausing deployments during upgrades
1 parent 5089955 commit cafa14f

File tree

6 files changed

+806
-0
lines changed

6 files changed

+806
-0
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"time"
8+
9+
"github.com/sirupsen/logrus"
10+
"github.com/stakater/Reloader/internal/pkg/options"
11+
"github.com/stakater/Reloader/pkg/kube"
12+
app "k8s.io/api/apps/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
patchtypes "k8s.io/apimachinery/pkg/types"
15+
)
16+
17+
// Keeps track of currently active timers
18+
var activeTimers = make(map[string]*time.Timer)
19+
20+
// Returns unique key for the activeTimers map
21+
func getTimerKey(namespace, deploymentName string) string {
22+
return fmt.Sprintf("%s/%s", namespace, deploymentName)
23+
}
24+
25+
// Checks if a deployment is currently paused
26+
func IsPaused(deployment *app.Deployment) bool {
27+
return deployment.Spec.Paused
28+
}
29+
30+
// Deployment paused by reloader ?
31+
func IsPausedByReloader(deployment *app.Deployment) bool {
32+
if IsPaused(deployment) {
33+
pausedAtAnnotationValue := deployment.Annotations[options.PauseDeploymentTimeAnnotation]
34+
return pausedAtAnnotationValue != ""
35+
}
36+
return false
37+
}
38+
39+
// Returns the time, the deployment was paused by reloader, nil otherwise
40+
func GetPauseStartTime(deployment *app.Deployment) (*time.Time, error) {
41+
if !IsPausedByReloader(deployment) {
42+
return nil, nil
43+
}
44+
45+
pausedAtStr := deployment.Annotations[options.PauseDeploymentTimeAnnotation]
46+
parsedTime, err := time.Parse(time.RFC3339, pausedAtStr)
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
return &parsedTime, nil
52+
}
53+
54+
// ParsePauseDuration parses the pause interval value and returns a time.Duration
55+
func ParsePauseDuration(pauseIntervalValue string) (time.Duration, error) {
56+
pauseDuration, err := time.ParseDuration(pauseIntervalValue)
57+
if err != nil {
58+
logrus.Warnf("Failed to parse pause interval value '%s': %v", pauseIntervalValue, err)
59+
return 0, err
60+
}
61+
return pauseDuration, nil
62+
}
63+
64+
// Pauses a deployment for a specified duration and creates a timer to resume it
65+
// after the specified duration
66+
func PauseDeployment(deployment *app.Deployment, clients kube.Clients, namespace, pauseIntervalValue string) (*app.Deployment, error) {
67+
deploymentName := deployment.Name
68+
pauseDuration, err := ParsePauseDuration(pauseIntervalValue)
69+
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
if !IsPaused(deployment) {
75+
logrus.Infof("Pausing Deployment '%s' in namespace '%s' for %s", deploymentName, namespace, pauseDuration)
76+
77+
deploymentFuncs := GetDeploymentRollingUpgradeFuncs()
78+
79+
pausePatch, err := CreatePausePatch()
80+
if err != nil {
81+
logrus.Errorf("Failed to create pause patch for deployment '%s': %v", deploymentName, err)
82+
return deployment, err
83+
}
84+
85+
err = deploymentFuncs.PatchFunc(clients, namespace, deployment, patchtypes.StrategicMergePatchType, pausePatch)
86+
87+
if err != nil {
88+
logrus.Errorf("Failed to patch deployment '%s' in namespace '%s': %v", deploymentName, namespace, err)
89+
return deployment, err
90+
}
91+
92+
updatedDeployment, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
93+
94+
CreateResumeTimer(deployment, clients, namespace, pauseDuration)
95+
return updatedDeployment, err
96+
}
97+
98+
if !IsPausedByReloader(deployment) {
99+
logrus.Infof("Deployment '%s' in namespace '%s' already paused", deploymentName, namespace)
100+
return deployment, nil
101+
}
102+
103+
// Deployment has already been paused by reloader, check for timer
104+
logrus.Debugf("Deployment '%s' in namespace '%s' is already paused by reloader", deploymentName, namespace)
105+
106+
timerKey := getTimerKey(namespace, deploymentName)
107+
_, timerExists := activeTimers[timerKey]
108+
109+
if !timerExists {
110+
logrus.Warnf("Timer does not exist for already paused deployment '%s' in namespace '%s', creating new one",
111+
deploymentName, namespace)
112+
HandleMissingTimer(deployment, pauseDuration, clients, namespace)
113+
}
114+
return deployment, nil
115+
}
116+
117+
// Handles the case where missing timers for deployments that have been paused by reloader.
118+
// Could occur after new leader election or reloader restart
119+
func HandleMissingTimer(deployment *app.Deployment, pauseDuration time.Duration, clients kube.Clients, namespace string) {
120+
deploymentName := deployment.Name
121+
pauseStartTime, err := GetPauseStartTime(deployment)
122+
if err != nil {
123+
logrus.Errorf("Error parsing pause start time for deployment '%s' in namespace '%s': %v. Resuming deployment immediately",
124+
deploymentName, namespace, err)
125+
ResumeDeployment(deployment, namespace, clients)
126+
return
127+
}
128+
129+
if pauseStartTime == nil {
130+
return
131+
}
132+
133+
elapsedPauseTime := time.Since(*pauseStartTime)
134+
remainingPauseTime := pauseDuration - elapsedPauseTime
135+
136+
if remainingPauseTime <= 0 {
137+
logrus.Infof("Pause period for deployment '%s' in namespace '%s' has expired. Resuming immediately",
138+
deploymentName, namespace)
139+
ResumeDeployment(deployment, namespace, clients)
140+
return
141+
}
142+
143+
logrus.Infof("Creating missing timer for already paused deployment '%s' in namespace '%s' with remaining time %s",
144+
deploymentName, namespace, remainingPauseTime)
145+
CreateResumeTimer(deployment, clients, namespace, remainingPauseTime)
146+
}
147+
148+
// CreateResumeTimer creates a timer to resume the deployment after the specified duration
149+
func CreateResumeTimer(deployment *app.Deployment, clients kube.Clients, namespace string, pauseDuration time.Duration) {
150+
deploymentName := deployment.Name
151+
timerKey := getTimerKey(namespace, deployment.Name)
152+
153+
// Check if there's an existing timer for this deployment
154+
if _, exists := activeTimers[timerKey]; exists {
155+
logrus.Debugf("Timer already exists for deployment '%s' in namespace '%s', Skipping creation",
156+
deploymentName, namespace)
157+
return
158+
}
159+
160+
// Create and store the new timer
161+
timer := time.AfterFunc(pauseDuration, func() {
162+
ResumeDeployment(deployment, namespace, clients)
163+
})
164+
165+
// Add the new timer to the map
166+
activeTimers[timerKey] = timer
167+
168+
logrus.Debugf("Created pause timer for deployment '%s' in namespace '%s' with duration %s",
169+
deploymentName, namespace, pauseDuration)
170+
}
171+
172+
// ResumeDeployment resumes a deployment that has been paused by reloader
173+
func ResumeDeployment(deployment *app.Deployment, namespace string, clients kube.Clients) {
174+
deploymentName := deployment.Name
175+
176+
currentDeployment, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
177+
178+
if err != nil {
179+
logrus.Errorf("Failed to get deployment '%s' in namespace '%s': %v", deploymentName, namespace, err)
180+
return
181+
}
182+
183+
if !IsPausedByReloader(currentDeployment) {
184+
logrus.Infof("Deployment '%s' in namespace '%s' not paused by Reloader. Skipping resume", deploymentName, namespace)
185+
return
186+
}
187+
188+
deploymentFuncs := GetDeploymentRollingUpgradeFuncs()
189+
190+
resumePatch, err := CreateResumePatch()
191+
if err != nil {
192+
logrus.Errorf("Failed to create resume patch for deployment '%s': %v", deploymentName, err)
193+
return
194+
}
195+
196+
// Remove the timer
197+
timerKey := getTimerKey(namespace, deploymentName)
198+
if timer, exists := activeTimers[timerKey]; exists {
199+
timer.Stop()
200+
delete(activeTimers, timerKey)
201+
logrus.Debugf("Removed pause timer for deployment '%s' in namespace '%s'", deploymentName, namespace)
202+
}
203+
204+
err = deploymentFuncs.PatchFunc(clients, namespace, currentDeployment, patchtypes.StrategicMergePatchType, resumePatch)
205+
206+
if err != nil {
207+
logrus.Errorf("Failed to resume deployment '%s' in namespace '%s': %v", deploymentName, namespace, err)
208+
return
209+
}
210+
211+
logrus.Infof("Successfully resumed deployment '%s' in namespace '%s'", deploymentName, namespace)
212+
}
213+
214+
func CreatePausePatch() ([]byte, error) {
215+
patchData := map[string]interface{}{
216+
"spec": map[string]interface{}{
217+
"paused": true,
218+
},
219+
"metadata": map[string]interface{}{
220+
"annotations": map[string]string{
221+
options.PauseDeploymentTimeAnnotation: time.Now().Format(time.RFC3339),
222+
},
223+
},
224+
}
225+
226+
return json.Marshal(patchData)
227+
}
228+
229+
func CreateResumePatch() ([]byte, error) {
230+
patchData := map[string]interface{}{
231+
"spec": map[string]interface{}{
232+
"paused": false,
233+
},
234+
"metadata": map[string]interface{}{
235+
"annotations": map[string]interface{}{
236+
options.PauseDeploymentTimeAnnotation: nil,
237+
},
238+
},
239+
}
240+
241+
return json.Marshal(patchData)
242+
}

0 commit comments

Comments
 (0)