Skip to content

Commit

Permalink
Merge pull request #152 from jumpserver/pr@v3.6@dispatch_task
Browse files Browse the repository at this point in the history
fix: 修复日志循环打印问题
  • Loading branch information
feng626 authored Sep 4, 2023
2 parents 17660c3 + c70d954 commit a8df310
Showing 1 changed file with 9 additions and 12 deletions.
21 changes: 9 additions & 12 deletions pkg/jms/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/jumpserver/kael/pkg/schemas"
"github.com/jumpserver/wisp/protobuf-go/protobuf"
"go.uber.org/zap"
"io"
)

type PollJMSEvent struct{}
Expand All @@ -29,8 +28,6 @@ func (p *PollJMSEvent) clearZombieSession() {
resp, err := grpc.GlobalGrpcClient.Client.ScanRemainReplays(ctx, req)
if err != nil || !resp.Status.Ok {
logger.GlobalLogger.Error("Failed to scan remain replay")
} else {
logger.GlobalLogger.Info("Scan remain replay success")
}
}

Expand All @@ -40,18 +37,17 @@ func (p *PollJMSEvent) waitForKillSessionMessage() {
logger.GlobalLogger.Error("dispatch task err", zap.Error(err))
return
}
waitChan := make(chan struct{})
logger.GlobalLogger.Info("start dispatch task success")

closeStreamChan := make(chan struct{})
for {
taskResponse, err := stream.Recv()
if err == io.EOF {

if err != nil {
_ = stream.CloseSend()
close(waitChan)
close(closeStreamChan)
break
}
if err != nil {
logger.GlobalLogger.Error("Failed to receive a note", zap.Error(err))
continue
}

task := taskResponse.Task
sessionId := task.SessionId
Expand All @@ -75,7 +71,8 @@ func (p *PollJMSEvent) waitForKillSessionMessage() {
}
}
}
<-waitChan
<-closeStreamChan
p.waitForKillSessionMessage()
}
func (p *PollJMSEvent) sendFinishTask(stream protobuf.Service_DispatchTaskClient, TaskId string) {
req := &protobuf.FinishedTaskRequest{
Expand All @@ -101,7 +98,7 @@ func (p *PollJMSEvent) sendSessionState(jmss *JMSSession, state schemas.SessionS
response := &schemas.AskResponse{
Type: schemas.Waiting,
ConversationID: jmss.Session.Id,
SystemMessage: msg,
SystemMessage: msg,
Meta: schemas.ResponseMeta{SessionState: state},
}

Expand Down

0 comments on commit a8df310

Please sign in to comment.