@@ -2,6 +2,7 @@ package deploylog
2
2
3
3
import (
4
4
"fmt"
5
+ "io"
5
6
"sort"
6
7
"time"
7
8
@@ -11,6 +12,7 @@ import (
11
12
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12
13
"k8s.io/apimachinery/pkg/labels"
13
14
"k8s.io/apimachinery/pkg/runtime"
15
+ "k8s.io/apimachinery/pkg/runtime/schema"
14
16
"k8s.io/apimachinery/pkg/util/wait"
15
17
apirequest "k8s.io/apiserver/pkg/endpoints/request"
16
18
genericrest "k8s.io/apiserver/pkg/registry/generic/rest"
@@ -19,8 +21,6 @@ import (
19
21
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
20
22
"k8s.io/kubernetes/pkg/controller"
21
23
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
22
- kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
23
- "k8s.io/kubernetes/pkg/registry/core/pod"
24
24
25
25
appsapi "github.com/openshift/origin/pkg/apps/apis/apps"
26
26
"github.com/openshift/origin/pkg/apps/apis/apps/validation"
@@ -36,29 +36,16 @@ const (
36
36
defaultInterval = 1 * time .Second
37
37
)
38
38
39
- // podGetter implements the ResourceGetter interface. Used by LogLocation to
40
- // retrieve the deployer pod
41
- type podGetter struct {
42
- pn kcoreclient.PodsGetter
43
- }
44
-
45
- // Get is responsible for retrieving the deployer pod
46
- func (g * podGetter ) Get (ctx apirequest.Context , name string , options * metav1.GetOptions ) (runtime.Object , error ) {
47
- namespace , ok := apirequest .NamespaceFrom (ctx )
48
- if ! ok {
49
- return nil , errors .NewBadRequest ("namespace parameter required." )
50
- }
51
- return g .pn .Pods (namespace ).Get (name , * options )
52
- }
53
-
54
39
// REST is an implementation of RESTStorage for the api server.
55
40
type REST struct {
56
- dn appsclient.DeploymentConfigsGetter
57
- rn kcoreclient.ReplicationControllersGetter
58
- pn kcoreclient.PodsGetter
59
- connInfo kubeletclient.ConnectionInfoGetter
60
- timeout time.Duration
61
- interval time.Duration
41
+ dcClient appsclient.DeploymentConfigsGetter
42
+ rcClient kcoreclient.ReplicationControllersGetter
43
+ podClient kcoreclient.PodsGetter
44
+ timeout time.Duration
45
+ interval time.Duration
46
+
47
+ // for unit testing
48
+ getLogsFn func (podNamespace , podName string , logOpts * kapi.PodLogOptions ) (runtime.Object , error )
62
49
}
63
50
64
51
// REST implements GetterWithOptions
@@ -68,15 +55,17 @@ var _ = rest.GetterWithOptions(&REST{})
68
55
// one for deployments (replication controllers) and one for pods to get the necessary
69
56
// attributes to assemble the URL to which the request shall be redirected in order to
70
57
// get the deployment logs.
71
- func NewREST (dn appsclient.DeploymentConfigsGetter , rn kcoreclient.ReplicationControllersGetter , pn kcoreclient.PodsGetter , connectionInfo kubeletclient.ConnectionInfoGetter ) * REST {
72
- return & REST {
73
- dn : dn ,
74
- rn : rn ,
75
- pn : pn ,
76
- connInfo : connectionInfo ,
77
- timeout : defaultTimeout ,
78
- interval : defaultInterval ,
58
+ func NewREST (dcClient appsclient.DeploymentConfigsGetter , rcClient kcoreclient.ReplicationControllersGetter , podClient kcoreclient.PodsGetter ) * REST {
59
+ r := & REST {
60
+ dcClient : dcClient ,
61
+ rcClient : rcClient ,
62
+ podClient : podClient ,
63
+ timeout : defaultTimeout ,
64
+ interval : defaultInterval ,
79
65
}
66
+ r .getLogsFn = r .getLogs
67
+
68
+ return r
80
69
}
81
70
82
71
// NewGetOptions returns a new options object for deployment logs
@@ -108,7 +97,7 @@ func (r *REST) Get(ctx apirequest.Context, name string, opts runtime.Object) (ru
108
97
109
98
// Fetch deploymentConfig and check latest version; if 0, there are no deployments
110
99
// for this config
111
- config , err := r .dn .DeploymentConfigs (namespace ).Get (name , metav1.GetOptions {})
100
+ config , err := r .dcClient .DeploymentConfigs (namespace ).Get (name , metav1.GetOptions {})
112
101
if err != nil {
113
102
return nil , errors .NewNotFound (appsapi .Resource ("deploymentconfig" ), name )
114
103
}
@@ -154,11 +143,11 @@ func (r *REST) Get(ctx apirequest.Context, name string, opts runtime.Object) (ru
154
143
}
155
144
glog .V (4 ).Infof ("Deployment %s is in %s state, waiting for it to start..." , appsutil .LabelForDeployment (target ), status )
156
145
157
- if err := appsutil .WaitForRunningDeployerPod (r .pn , target , r .timeout ); err != nil {
146
+ if err := appsutil .WaitForRunningDeployerPod (r .podClient , target , r .timeout ); err != nil {
158
147
return nil , errors .NewBadRequest (fmt .Sprintf ("failed to run deployer pod %s: %v" , podName , err ))
159
148
}
160
149
161
- latest , ok , err := registry .WaitForRunningDeployment (r .rn , target , r .timeout )
150
+ latest , ok , err := registry .WaitForRunningDeployment (r .rcClient , target , r .timeout )
162
151
if err != nil {
163
152
return nil , errors .NewBadRequest (fmt .Sprintf ("unable to wait for deployment %s to run: %v" , appsutil .LabelForDeployment (target ), err ))
164
153
}
@@ -179,20 +168,46 @@ func (r *REST) Get(ctx apirequest.Context, name string, opts runtime.Object) (ru
179
168
}
180
169
181
170
logOpts := appsapi .DeploymentToPodLogOptions (deployLogOpts )
182
- location , transport , err := pod .LogLocation (& podGetter {r .pn }, r .connInfo , ctx , podName , logOpts )
171
+ return r .getLogsFn (namespace , podName , logOpts )
172
+ }
173
+
174
+ func (r * REST ) getLogs (podNamespace , podName string , logOpts * kapi.PodLogOptions ) (runtime.Object , error ) {
175
+ logRequest := r .podClient .Pods (podNamespace ).GetLogs (podName , logOpts )
176
+
177
+ readerCloser , err := logRequest .Stream ()
183
178
if err != nil {
184
- return nil , errors . NewBadRequest ( err . Error ())
179
+ return nil , err
185
180
}
186
181
187
- return & genericrest.LocationStreamer {
188
- Location : location ,
189
- Transport : transport ,
190
- ContentType : "text/plain" ,
191
- Flush : deployLogOpts .Follow ,
192
- ResponseChecker : genericrest .NewGenericHttpResponseChecker (kapi .Resource ("pod" ), podName ),
182
+ return & passThroughStreamer {
183
+ In : readerCloser ,
184
+ Flush : logOpts .Follow ,
185
+ ContentType : "text/plain" ,
193
186
}, nil
194
187
}
195
188
189
+ type passThroughStreamer struct {
190
+ In io.ReadCloser
191
+ Flush bool
192
+ ContentType string
193
+ }
194
+
195
+ // a PipeStreamer must implement a rest.ResourceStreamer
196
+ var _ rest.ResourceStreamer = & passThroughStreamer {}
197
+
198
+ func (obj * passThroughStreamer ) GetObjectKind () schema.ObjectKind {
199
+ return schema .EmptyObjectKind
200
+ }
201
+
202
+ func (obj * passThroughStreamer ) DeepCopyObject () runtime.Object {
203
+ panic ("passThroughStreamer does not implement DeepCopyObject" )
204
+ }
205
+
206
+ // InputStream returns a stream with the contents of the embedded pipe.
207
+ func (s * passThroughStreamer ) InputStream (apiVersion , acceptHeader string ) (stream io.ReadCloser , flush bool , contentType string , err error ) {
208
+ return s .In , s .Flush , s .ContentType , nil
209
+ }
210
+
196
211
// waitForExistingDeployment will use the timeout to wait for a deployment to appear.
197
212
func (r * REST ) waitForExistingDeployment (namespace , name string ) (* kapi.ReplicationController , error ) {
198
213
var (
@@ -201,7 +216,7 @@ func (r *REST) waitForExistingDeployment(namespace, name string) (*kapi.Replicat
201
216
)
202
217
203
218
condition := func () (bool , error ) {
204
- target , err = r .rn .ReplicationControllers (namespace ).Get (name , metav1.GetOptions {})
219
+ target , err = r .rcClient .ReplicationControllers (namespace ).Get (name , metav1.GetOptions {})
205
220
switch {
206
221
case errors .IsNotFound (err ):
207
222
return false , nil
@@ -224,7 +239,7 @@ func (r *REST) returnApplicationPodName(target *kapi.ReplicationController) (str
224
239
selector := labels .SelectorFromValidatedSet (labels .Set (target .Spec .Selector ))
225
240
sortBy := func (pods []* kapiv1.Pod ) sort.Interface { return controller .ByLogging (pods ) }
226
241
227
- firstPod , _ , err := kcmdutil .GetFirstPod (r .pn , target .Namespace , selector .String (), r .timeout , sortBy )
242
+ firstPod , _ , err := kcmdutil .GetFirstPod (r .podClient , target .Namespace , selector .String (), r .timeout , sortBy )
228
243
if err != nil {
229
244
return "" , errors .NewInternalError (err )
230
245
}
0 commit comments