Merge branch 'master' into replication

This commit is contained in:
Alain Jobart 2015-08-05 20:42:09 -07:00
Родитель ed0c4c9cf5 d0de5242c9
Коммит d56b66ea1c
6 изменённых файлов: 213 добавлений и 143 удалений

Просмотреть файл

@ -5,19 +5,21 @@
package automation
import (
"github.com/golang/protobuf/proto"
pb "github.com/youtube/vitess/go/vt/proto/automation"
)
// ClusterOperationInstance is a runtime type which enhances the protobuf message "ClusterOperation" with runtime specific data.
// Unlike the protobuf message, the additional runtime data will not be part of a checkpoint.
// Methods of this struct are not thread-safe.
type ClusterOperationInstance struct {
pb.ClusterOperation
taskIDGenerator *IDGenerator
}
// NewClusterOperationInstance creates a new cluster operation instance with one initial task.
func NewClusterOperationInstance(clusterOpID string, initialTask *pb.TaskContainer, taskIDGenerator *IDGenerator) *ClusterOperationInstance {
c := &ClusterOperationInstance{
func NewClusterOperationInstance(clusterOpID string, initialTask *pb.TaskContainer, taskIDGenerator *IDGenerator) ClusterOperationInstance {
c := ClusterOperationInstance{
pb.ClusterOperation{
Id: clusterOpID,
SerialTasks: []*pb.TaskContainer{},
@ -29,20 +31,9 @@ func NewClusterOperationInstance(clusterOpID string, initialTask *pb.TaskContain
return c
}
// addMissingTaskID assigns a task id to each task in "tc".
func (c *ClusterOperationInstance) addMissingTaskID(tc []*pb.TaskContainer) {
for _, taskContainer := range tc {
for _, task := range taskContainer.ParallelTasks {
if task.Id == "" {
task.Id = c.taskIDGenerator.GetNextID()
}
}
}
}
// InsertTaskContainers inserts "newTaskContainers" at pos in the current list of task containers. Existing task containers will be moved after the new task containers.
func (c *ClusterOperationInstance) InsertTaskContainers(newTaskContainers []*pb.TaskContainer, pos int) {
c.addMissingTaskID(newTaskContainers)
AddMissingTaskID(newTaskContainers, c.taskIDGenerator)
newSerialTasks := make([]*pb.TaskContainer, len(c.SerialTasks)+len(newTaskContainers))
copy(newSerialTasks, c.SerialTasks[:pos])
@ -50,3 +41,11 @@ func (c *ClusterOperationInstance) InsertTaskContainers(newTaskContainers []*pb.
copy(newSerialTasks[pos+len(newTaskContainers):], c.SerialTasks[pos:])
c.SerialTasks = newSerialTasks
}
// Clone creates a deep copy of the inner protobuf.
// Other elements e.g. taskIDGenerator are not deep-copied.
func (c ClusterOperationInstance) Clone() ClusterOperationInstance {
var clone = c
clone.ClusterOperation = *(proto.Clone(&c.ClusterOperation).(*pb.ClusterOperation))
return clone
}

Просмотреть файл

@ -10,6 +10,7 @@ Package automation contains code to execute high-level cluster operations
package automation
import (
"errors"
"fmt"
"sync"
@ -37,7 +38,7 @@ type Scheduler struct {
// Guarded by "mu".
registeredClusterOperations map[string]bool
// Guarded by "mu".
toBeScheduledClusterOperations chan *ClusterOperationInstance
toBeScheduledClusterOperations chan ClusterOperationInstance
// Guarded by "mu".
state schedulerState
@ -49,9 +50,13 @@ type Scheduler struct {
muOpList sync.Mutex
// Guarded by "muOpList".
activeClusterOperations map[string]*ClusterOperationInstance
// The key of the map is ClusterOperationInstance.ID.
// This map contains a copy of the ClusterOperationInstance which is currently processed.
// The scheduler may update the copy with the latest status.
activeClusterOperations map[string]ClusterOperationInstance
// Guarded by "muOpList".
finishedClusterOperations map[string]*ClusterOperationInstance
// The key of the map is ClusterOperationInstance.ID.
finishedClusterOperations map[string]ClusterOperationInstance
}
// NewScheduler creates a new instance.
@ -63,12 +68,12 @@ func NewScheduler() (*Scheduler, error) {
s := &Scheduler{
registeredClusterOperations: defaultClusterOperations,
idGenerator: IDGenerator{},
toBeScheduledClusterOperations: make(chan *ClusterOperationInstance, 10),
toBeScheduledClusterOperations: make(chan ClusterOperationInstance, 10),
state: stateNotRunning,
taskCreator: defaultTaskCreator,
pendingOpsWg: &sync.WaitGroup{},
activeClusterOperations: make(map[string]*ClusterOperationInstance),
finishedClusterOperations: make(map[string]*ClusterOperationInstance),
activeClusterOperations: make(map[string]ClusterOperationInstance),
finishedClusterOperations: make(map[string]ClusterOperationInstance),
}
return s, nil
@ -106,7 +111,7 @@ func (s *Scheduler) processRequestsLoop() {
log.Infof("Stopped processing loop for ClusterOperations.")
}
func (s *Scheduler) processClusterOperation(clusterOp *ClusterOperationInstance) {
func (s *Scheduler) processClusterOperation(clusterOp ClusterOperationInstance) {
if clusterOp.State == pb.ClusterOperationState_CLUSTER_OPERATION_DONE {
log.Infof("ClusterOperation: %v skipping because it is already done. Details: %v", clusterOp.Id, clusterOp)
return
@ -114,77 +119,86 @@ func (s *Scheduler) processClusterOperation(clusterOp *ClusterOperationInstance)
log.Infof("ClusterOperation: %v running. Details: %v", clusterOp.Id, clusterOp)
var lastTaskError string
clusterOpLoop:
for i := 0; i < len(clusterOp.SerialTasks); i++ {
taskContainer := clusterOp.SerialTasks[i]
for _, taskProto := range taskContainer.ParallelTasks {
if taskProto.State == pb.TaskState_DONE {
if taskProto.Error != "" {
log.Errorf("Task: %v (%v/%v) failed before. Aborting the ClusterOperation. Error: %v Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, taskProto.Error, taskProto)
lastTaskError = taskProto.Error
break
} else {
log.Infof("Task: %v (%v/%v) skipped because it is already done. Full Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, taskProto)
}
}
task, err := s.createTaskInstance(taskProto.Name)
newTaskContainers, output, err := s.runTask(taskProto, clusterOp.Id)
if err != nil {
log.Errorf("Task: %v (%v/%v) could not be instantiated. Error: %v Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, err, taskProto)
MarkTaskFailed(taskProto, "", err)
lastTaskError = err.Error()
break
MarkTaskFailed(taskProto, output, err)
clusterOp.Error = err.Error()
break clusterOpLoop
} else {
MarkTaskSucceeded(taskProto, output)
}
taskProto.State = pb.TaskState_RUNNING
log.Infof("Task: %v (%v/%v) running. Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, taskProto)
newTaskContainers, output, errRun := task.Run(taskProto.Parameters)
log.Infof("Task: %v (%v/%v) finished. newTaskContainers: %v, output: %v, error: %v", taskProto.Name, clusterOp.Id, taskProto.Id, newTaskContainers, output, errRun)
if errRun != nil {
MarkTaskFailed(taskProto, output, errRun)
lastTaskError = errRun.Error()
break
}
MarkTaskSucceeded(taskProto, output)
if newTaskContainers != nil {
// Make sure all new tasks do not miss any required parameters.
for _, newTaskContainer := range newTaskContainers {
for _, newTaskProto := range newTaskContainer.ParallelTasks {
err := s.validateTaskSpecification(newTaskProto.Name, newTaskProto.Parameters)
if err != nil {
log.Errorf("Task: %v (%v/%v) emitted a new task which is not valid. Error: %v Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, err, newTaskProto)
MarkTaskFailed(taskProto, output, err)
lastTaskError = err.Error()
break
}
}
err := s.validateTaskContainers(newTaskContainers)
if err != nil {
log.Errorf("Task: %v (%v/%v) emitted a new task which is not valid. Error: %v", taskProto.Name, clusterOp.Id, taskProto.Id, err)
MarkTaskFailed(taskProto, output, err)
clusterOp.Error = err.Error()
break clusterOpLoop
}
if lastTaskError == "" {
clusterOp.InsertTaskContainers(newTaskContainers, i+1)
log.Infof("ClusterOperation: %v %d new task containers added by %v (%v/%v). Updated ClusterOperation: %v",
clusterOp.Id, len(newTaskContainers), taskProto.Name, clusterOp.Id, taskProto.Id, clusterOp)
}
clusterOp.InsertTaskContainers(newTaskContainers, i+1)
log.Infof("ClusterOperation: %v %d new task containers added by %v (%v/%v). Updated ClusterOperation: %v",
clusterOp.Id, len(newTaskContainers), taskProto.Name, clusterOp.Id, taskProto.Id, clusterOp)
}
s.Checkpoint(clusterOp)
}
}
clusterOp.State = pb.ClusterOperationState_CLUSTER_OPERATION_DONE
if lastTaskError != "" {
clusterOp.Error = lastTaskError
}
log.Infof("ClusterOperation: %v finished. Details: %v", clusterOp.Id, clusterOp)
s.Checkpoint(clusterOp)
// Move operation from active to finished.
s.muOpList.Lock()
if s.activeClusterOperations[clusterOp.Id] != clusterOp {
defer s.muOpList.Unlock()
if _, ok := s.activeClusterOperations[clusterOp.Id]; !ok {
panic("Pending ClusterOperation was not recorded as active, but should have.")
}
delete(s.activeClusterOperations, clusterOp.Id)
s.finishedClusterOperations[clusterOp.Id] = clusterOp
s.muOpList.Unlock()
}
func (s *Scheduler) runTask(taskProto *pb.Task, clusterOpID string) ([]*pb.TaskContainer, string, error) {
if taskProto.State == pb.TaskState_DONE {
// Task is already done (e.g. because we resume from a checkpoint).
if taskProto.Error != "" {
log.Errorf("Task: %v (%v/%v) failed before. Aborting the ClusterOperation. Error: %v Details: %v", taskProto.Name, clusterOpID, taskProto.Id, taskProto.Error, taskProto)
return nil, "", errors.New(taskProto.Error)
}
log.Infof("Task: %v (%v/%v) skipped because it is already done. Full Details: %v", taskProto.Name, clusterOpID, taskProto.Id, taskProto)
return nil, taskProto.Output, nil
}
task, err := s.createTaskInstance(taskProto.Name)
if err != nil {
log.Errorf("Task: %v (%v/%v) could not be instantiated. Error: %v Details: %v", taskProto.Name, clusterOpID, taskProto.Id, err, taskProto)
return nil, "", err
}
taskProto.State = pb.TaskState_RUNNING
log.Infof("Task: %v (%v/%v) running. Details: %v", taskProto.Name, clusterOpID, taskProto.Id, taskProto)
newTaskContainers, output, err := task.Run(taskProto.Parameters)
log.Infof("Task: %v (%v/%v) finished. newTaskContainers: %v, output: %v, error: %v", taskProto.Name, clusterOpID, taskProto.Id, newTaskContainers, output, err)
return newTaskContainers, output, err
}
func (s *Scheduler) validateTaskContainers(newTaskContainers []*pb.TaskContainer) error {
for _, newTaskContainer := range newTaskContainers {
for _, newTaskProto := range newTaskContainer.ParallelTasks {
err := s.validateTaskSpecification(newTaskProto.Name, newTaskProto.Parameters)
if err != nil {
return fmt.Errorf("Error: %v Task: %v", err, newTaskProto)
}
}
}
return nil
}
func defaultTaskCreator(taskName string) Task {
@ -267,9 +281,9 @@ func (s *Scheduler) EnqueueClusterOperation(ctx context.Context, req *pb.Enqueue
clusterOp := NewClusterOperationInstance(clusterOpID, initialTask, &taskIDGenerator)
s.muOpList.Lock()
s.toBeScheduledClusterOperations <- clusterOp
s.activeClusterOperations[clusterOpID] = clusterOp
s.muOpList.Unlock()
s.toBeScheduledClusterOperations <- clusterOp
return &pb.EnqueueClusterOperationResponse{
Id: clusterOp.Id,
@ -277,9 +291,9 @@ func (s *Scheduler) EnqueueClusterOperation(ctx context.Context, req *pb.Enqueue
}
// findClusterOp checks for a given ClusterOperation ID if it's in the list of active or finished operations.
func (s *Scheduler) findClusterOp(id string) (*ClusterOperationInstance, error) {
func (s *Scheduler) findClusterOp(id string) (ClusterOperationInstance, error) {
var ok bool
var clusterOp *ClusterOperationInstance
var clusterOp ClusterOperationInstance
s.muOpList.Lock()
defer s.muOpList.Unlock()
@ -288,9 +302,18 @@ func (s *Scheduler) findClusterOp(id string) (*ClusterOperationInstance, error)
clusterOp, ok = s.finishedClusterOperations[id]
}
if !ok {
return nil, fmt.Errorf("ClusterOperation with id: %v not found", id)
return clusterOp, fmt.Errorf("ClusterOperation with id: %v not found", id)
}
return clusterOp, nil
return clusterOp.Clone(), nil
}
// Checkpoint should be called every time the state of the cluster op changes.
// It is used to update the copy of the state in activeClusterOperations.
func (s *Scheduler) Checkpoint(clusterOp ClusterOperationInstance) {
// TODO(mberlin): Add here support for persistent checkpoints.
s.muOpList.Lock()
defer s.muOpList.Unlock()
s.activeClusterOperations[clusterOp.Id] = clusterOp.Clone()
}
// GetClusterOperationDetails can be used to query the full details of active or finished operations.

Просмотреть файл

@ -5,6 +5,7 @@
package automation
import (
"strings"
"testing"
"time"
@ -25,6 +26,30 @@ func newTestScheduler(t *testing.T) *Scheduler {
return scheduler
}
func enqueueClusterOperationAndCheckOutput(t *testing.T, taskName string, expectedOutput string, expectedError string) *pb.ClusterOperation {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()
scheduler.registerClusterOperation("TestingEchoTask")
scheduler.registerClusterOperation("TestingFailTask")
scheduler.registerClusterOperation("TestingEmitEchoTask")
scheduler.registerClusterOperation("TestingEmitEchoFailEchoTask")
scheduler.Run()
enqueueRequest := &pb.EnqueueClusterOperationRequest{
Name: taskName,
Parameters: map[string]string{
"echo_text": expectedOutput,
},
}
enqueueResponse, err := scheduler.EnqueueClusterOperation(context.TODO(), enqueueRequest)
if err != nil {
t.Fatalf("Failed to start cluster operation. Request: %v Error: %v", enqueueRequest, err)
}
return waitForClusterOperation(t, scheduler, enqueueResponse.Id, expectedOutput, expectedError)
}
// waitForClusterOperation is a helper function which blocks until the Cluster Operation has finished.
func waitForClusterOperation(t *testing.T, scheduler *Scheduler, id string, expectedOutputLastTask string, expectedErrorLastTask string) *pb.ClusterOperation {
if expectedOutputLastTask == "" && expectedErrorLastTask == "" {
@ -42,15 +67,22 @@ func waitForClusterOperation(t *testing.T, scheduler *Scheduler, id string, expe
}
if getDetailsResponse.ClusterOp.State == pb.ClusterOperationState_CLUSTER_OPERATION_DONE {
tc := getDetailsResponse.ClusterOp.SerialTasks
lastTc := tc[len(tc)-1]
// Check the last task which have finished. (It may not be the last one because tasks can fail.)
var lastTc *pb.TaskContainer
for i := len(tc) - 1; i >= 0; i-- {
if tc[i].ParallelTasks[len(tc[i].ParallelTasks)-1].State == pb.TaskState_DONE {
lastTc = tc[i]
break
}
}
if expectedOutputLastTask != "" {
if lastTc.ParallelTasks[len(lastTc.ParallelTasks)-1].Output != expectedOutputLastTask {
t.Fatalf("ClusterOperation finished but did not return expected output. want: %v Full ClusterOperation details: %v", expectedOutputLastTask, proto.MarshalTextString(getDetailsResponse.ClusterOp))
}
}
if expectedErrorLastTask != "" {
if lastTc.ParallelTasks[len(lastTc.ParallelTasks)-1].Error != expectedErrorLastTask {
t.Fatalf("ClusterOperation finished but did not return expected error. Full ClusterOperation details: %v", getDetailsResponse.ClusterOp)
if lastError := lastTc.ParallelTasks[len(lastTc.ParallelTasks)-1].Error; !strings.Contains(lastError, expectedErrorLastTask) {
t.Fatalf("ClusterOperation finished last error does not contain expected error. got: '%v' want: '%v' Full ClusterOperation details: %v", lastError, expectedErrorLastTask, getDetailsResponse.ClusterOp)
}
}
return getDetailsResponse.ClusterOp
@ -70,34 +102,29 @@ func TestSchedulerImmediateShutdown(t *testing.T) {
scheduler.ShutdownAndWait()
}
func enqueueClusterOperationAndCheckOutput(t *testing.T, taskName string, expectedOutput string) {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()
scheduler.registerClusterOperation("TestingEchoTask")
scheduler.registerClusterOperation("TestingEmitEchoTask")
scheduler.Run()
enqueueRequest := &pb.EnqueueClusterOperationRequest{
Name: taskName,
Parameters: map[string]string{
"echo_text": expectedOutput,
},
}
enqueueResponse, err := scheduler.EnqueueClusterOperation(context.TODO(), enqueueRequest)
if err != nil {
t.Fatalf("Failed to start cluster operation. Request: %v Error: %v", enqueueRequest, err)
}
waitForClusterOperation(t, scheduler, enqueueResponse.Id, expectedOutput, "")
}
func TestEnqueueSingleTask(t *testing.T) {
enqueueClusterOperationAndCheckOutput(t, "TestingEchoTask", "echoed text")
enqueueClusterOperationAndCheckOutput(t, "TestingEchoTask", "echoed text", "")
}
func TestEnqueueEmittingTask(t *testing.T) {
enqueueClusterOperationAndCheckOutput(t, "TestingEmitEchoTask", "echoed text from emitted task")
enqueueClusterOperationAndCheckOutput(t, "TestingEmitEchoTask", "echoed text from emitted task", "")
}
func TestFailedTaskFailsClusterOperation(t *testing.T) {
enqueueClusterOperationAndCheckOutput(t, "TestingFailTask", "something went wrong", "full error message")
}
func TestFailedTaskFailsWholeClusterOperationEarly(t *testing.T) {
// If a task fails in the middle of a cluster operation, the remaining tasks must not be executed.
details := enqueueClusterOperationAndCheckOutput(t, "TestingEmitEchoFailEchoTask", "", "full error message")
got := details.SerialTasks[2].ParallelTasks[0].Error
want := "full error message"
if got != want {
t.Errorf("TestFailedTaskFailsWholeClusterOperationEarly: got error: '%v' want error: '%v'", got, want)
}
if details.SerialTasks[3].ParallelTasks[0].State != pb.TaskState_NOT_STARTED {
t.Errorf("TestFailedTaskFailsWholeClusterOperationEarly: Task after a failing task must not have been started.")
}
}
func TestEnqueueFailsDueToMissingParameter(t *testing.T) {
@ -124,24 +151,6 @@ func TestEnqueueFailsDueToMissingParameter(t *testing.T) {
}
}
func TestFailedTaskFailsClusterOperation(t *testing.T) {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()
scheduler.registerClusterOperation("TestingFailTask")
scheduler.Run()
enqueueRequest := &pb.EnqueueClusterOperationRequest{
Name: "TestingFailTask",
}
enqueueResponse, err := scheduler.EnqueueClusterOperation(context.TODO(), enqueueRequest)
if err != nil {
t.Fatalf("Failed to start cluster operation. Request: %v Error: %v", enqueueRequest, err)
}
waitForClusterOperation(t, scheduler, enqueueResponse.Id, "something went wrong", "full error message")
}
func TestEnqueueFailsDueToUnregisteredClusterOperation(t *testing.T) {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()

Просмотреть файл

@ -30,3 +30,14 @@ func NewTaskContainer() *pb.TaskContainer {
func AddTask(t *pb.TaskContainer, taskName string, parameters map[string]string) {
t.ParallelTasks = append(t.ParallelTasks, NewTask(taskName, parameters))
}
// AddMissingTaskID assigns a task id to each task in "tc".
func AddMissingTaskID(tc []*pb.TaskContainer, taskIDGenerator *IDGenerator) {
for _, taskContainer := range tc {
for _, task := range taskContainer.ParallelTasks {
if task.Id == "" {
task.Id = taskIDGenerator.GetNextID()
}
}
}
}

Просмотреть файл

@ -15,10 +15,12 @@ func testingTaskCreator(taskName string) Task {
// Tasks for testing only.
case "TestingEchoTask":
return &TestingEchoTask{}
case "TestingEmitEchoTask":
return &TestingEmitEchoTask{}
case "TestingFailTask":
return &TestingFailTask{}
case "TestingEmitEchoTask":
return &TestingEmitEchoTask{}
case "TestingEmitEchoFailEchoTask":
return &TestingEmitEchoFailEchoTask{}
default:
return nil
}
@ -39,6 +41,18 @@ func (t *TestingEchoTask) RequiredParameters() []string {
return []string{"echo_text"}
}
// TestingFailTask is used only for testing. It always fails.
type TestingFailTask struct {
}
func (t *TestingFailTask) Run(parameters map[string]string) (newTasks []*pb.TaskContainer, output string, err error) {
return nil, "something went wrong", errors.New("full error message")
}
func (t *TestingFailTask) RequiredParameters() []string {
return []string{}
}
// TestingEmitEchoTask is used only for testing. It emits a TestingEchoTask.
type TestingEmitEchoTask struct {
}
@ -53,14 +67,20 @@ func (t *TestingEmitEchoTask) RequiredParameters() []string {
return []string{}
}
// TestingFailTask is used only for testing. It always fails.
type TestingFailTask struct {
// TestingEmitEchoFailEchoTask is used only for testing.
// It emits three sequential tasks: Echo, Fail, Echo.
type TestingEmitEchoFailEchoTask struct {
}
func (t *TestingFailTask) Run(parameters map[string]string) (newTasks []*pb.TaskContainer, output string, err error) {
return nil, "something went wrong", errors.New("full error message")
func (t *TestingEmitEchoFailEchoTask) Run(parameters map[string]string) (newTasks []*pb.TaskContainer, output string, err error) {
newTasks = []*pb.TaskContainer{
NewTaskContainerWithSingleTask("TestingEchoTask", parameters),
NewTaskContainerWithSingleTask("TestingFailTask", parameters),
NewTaskContainerWithSingleTask("TestingEchoTask", parameters),
}
return newTasks, "emitted tasks: Echo, Fail, Echo", nil
}
func (t *TestingFailTask) RequiredParameters() []string {
func (t *TestingEmitEchoFailEchoTask) RequiredParameters() []string {
return []string{}
}

Просмотреть файл

@ -37,26 +37,34 @@ done
# git doesn't give us access to user input, so let's steal it.
exec < /dev/tty
echo
echo "Lint suggestions were found. They're not enforced, but we're pausing"
echo "to let you know before they get clobbered in the scrollback buffer."
echo
read -r -p 'Press enter to cancel, "s" to step through the warnings or type "ack" to continue: '
if [ "$REPLY" = "ack" ]; then
exit 0
fi
if [ "$REPLY" = "s" ]; then
first_file="true"
if [[ $? -eq 0 ]]; then
# interactive shell. Prompt the user.
echo
echo "Lint suggestions were found. They're not enforced, but we're pausing"
echo "to let you know before they get clobbered in the scrollback buffer."
echo
read -r -p 'Press enter to cancel, "s" to step through the warnings or type "ack" to continue: '
if [ "$REPLY" = "ack" ]; then
exit 0
fi
if [ "$REPLY" = "s" ]; then
first_file="true"
for gofile in "${gofiles_with_warnings[@]}"
do
echo
if [ "$first_file" != "true" ]; then
echo "Press enter to show the warnings for the next file."
read
fi
golint $gofile
first_file="false"
done
fi
else
# non-interactive shell (e.g. called from Eclipse). Just display the errors.
for gofile in "${gofiles_with_warnings[@]}"
do
echo
if [ "$first_file" != "true" ]; then
echo "Press enter to show the warnings for the next file."
read
fi
golint $gofile
first_file="false"
done
fi
exit 1