Skip to content

Commit

Permalink
Merge pull request #2 from limanmys/feature/report-queue-support
Browse files Browse the repository at this point in the history
Feature/report queue support
  • Loading branch information
zekiahmetbayar authored Nov 8, 2023
2 parents 1047b6f + a670d15 commit 6311840
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 3 deletions.
100 changes: 99 additions & 1 deletion app/handlers/queue.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package handlers

import (
"errors"

"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"github.com/limanmys/render-engine/app/models"
"github.com/limanmys/render-engine/internal/database"
"github.com/limanmys/render-engine/internal/liman"
"github.com/limanmys/render-engine/internal/process_queue"
"github.com/limanmys/render-engine/pkg/helpers"
"github.com/limanmys/render-engine/pkg/logger"
"gorm.io/gorm"
)
Expand All @@ -21,9 +25,26 @@ func NewQueueHandler() *QueueHandler {
}

func (h *QueueHandler) Create(c *fiber.Ctx) error {
var formData map[string]string
queue := &models.Queue{}
if err := c.BodyParser(&queue); err != nil {
return err
formData = helpers.GetFormData(c)
if formData == nil {
return err
}
}

if queue.Type == "" {
queue = &models.Queue{
Type: models.Operation(formData["type"]),
Data: map[string]interface{}{
"extension_id": formData["extension_id"],
"server_id": formData["server_id"],
"user_id": formData["user_id"],
"target": formData["target"],
"payload": formData["payload"],
},
}
}

if queue.Data["server_id"] == nil {
Expand Down Expand Up @@ -61,8 +82,85 @@ func (h *QueueHandler) Create(c *fiber.Ctx) error {
DB: h.db,
UserID: c.Locals("user_id").(string),
}
go processor.Process()
case models.OperationReport:
processor = &process_queue.CreateReport{
Queue: queue,
DB: h.db,
}

go processor.Process()
}

return c.JSON(queue)
}

func (h *QueueHandler) Index(c *fiber.Ctx) error {
extension_id, err := uuid.Parse(c.FormValue("extension_id"))
if err != nil {
return errors.New("invalid extension id")
}

user_id, err := uuid.Parse(c.FormValue("user_id"))
if err != nil {
return errors.New("invalid user id")
}

server_id, err := uuid.Parse(c.FormValue("server_id"))
if err != nil {
return errors.New("invalid server id")
}

if c.FormValue("queue_type") == "" {
return errors.New("invalid queue type")
}

var queues []*models.Queue
if err := h.db.Model(&models.Queue{}).
Where("type = ?", c.Params("queue_type")).
Where("data->>'extension_id' ?", extension_id).
Where("data->>'server_id' ?", server_id).
Where("data->>'user_id' ?", user_id).Find(&queues).Error; err != nil {
return err
}

return c.JSON(queues)
}

func (h *QueueHandler) Delete(c *fiber.Ctx) error {
// Parse uuid
uid_, err := uuid.Parse(c.Params("id"))
if err != nil {
return err
}

extension_id, err := uuid.Parse(c.FormValue("extension_id"))
if err != nil {
return errors.New("invalid extension id")
}

user_id, err := uuid.Parse(c.FormValue("user_id"))
if err != nil {
return errors.New("invalid user id")
}

server_id, err := uuid.Parse(c.FormValue("server_id"))
if err != nil {
return errors.New("invalid server id")
}

if c.FormValue("queue_type") == "" {
return errors.New("invalid queue type")
}

if err := h.db.Model(&models.Queue{}).
Where("type = ?", c.FormValue("queue_type")).
Where("id = ?", uid_).
Where("data->>'extension_id' ?", extension_id).
Where("data->>'server_id' ?", server_id).
Where("data->>'user_id' ?", user_id).Delete(models.Queue{}).Error; err != nil {
return err
}

return c.JSON("Item deleted successfully.")
}
12 changes: 10 additions & 2 deletions app/models/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Operation string
type Status string

const (
OperationReport Operation = "report"
OperationCreate Operation = "create"
OperationUpdate Operation = "update"
OperationInstall Operation = "install"
Expand All @@ -26,12 +27,13 @@ const (
// Queue structure of Queue object
type Queue struct {
ID string `json:"id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Type Operation `json:"type"`
Status Status `json:"status"`
Data gormjsonb.JSONB `json:"data" gorm:"type:jsonb;index,type:gin"`
Path string `json:"path"`
Error string `json:"error"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

func (Queue) TableName() string {
Expand Down Expand Up @@ -62,3 +64,9 @@ func (q *Queue) UpdateError(err string) {
q.Status = StatusFailed
database.Connection().Model(q).Save(q)
}

func (q *Queue) UpdateAsDone(path string) {
q.Path = path
q.Status = StatusDone
database.Connection().Model(q).Save(q)
}
2 changes: 2 additions & 0 deletions app/routes/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func Install(app *fiber.App) {
// queue handler
queueHandler := handlers.NewQueueHandler()
app.Post("/queue", queueHandler.Create)
app.Get("/queue", queueHandler.Index)
app.Delete("/queue/:id", queueHandler.Delete)

// cronjob
app.Post("/cronjobs", handlers.CreateCronJob)
Expand Down
102 changes: 102 additions & 0 deletions internal/process_queue/create_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package process_queue

import (
b64 "encoding/base64"
"strings"

"github.com/google/uuid"
"github.com/limanmys/render-engine/app/models"
"github.com/limanmys/render-engine/internal/liman"
"github.com/limanmys/render-engine/internal/sandbox"
"github.com/limanmys/render-engine/internal/user_token"
"github.com/limanmys/render-engine/pkg/helpers"
"github.com/limanmys/render-engine/pkg/linux"
"gorm.io/gorm"
)

type CreateReport struct {
Queue *models.Queue
DB *gorm.DB
}

func (c CreateReport) Process() error {
// Update cronjob as processing
c.Queue.UpdateStatus(models.StatusProcessing)
// Check extension
extension, err := liman.GetExtension(&models.Extension{
ID: c.Queue.Data["extension_id"].(string),
})
if err != nil {
// Update job as failed
c.Queue.UpdateError(err.Error())
return err
}
// Check extension status
if extension.Status == "0" {
// Update job as failed
c.Queue.UpdateError("extension is unavailable")
return err
}

// Get credentials
credentials := &models.Credentials{}
if extension.RequireKey == "true" {
credentials, err = liman.GetCredentials(
&models.User{
ID: c.Queue.Data["user_id"].(string),
},
&models.Server{
ID: c.Queue.Data["server_id"].(string),
},
)
// Check errors and username is valid
if err != nil || len(credentials.Username) < 1 {
// Update job as failed
c.Queue.UpdateError("you need a key to use this extension")
return err
}
}

// Encode to b64 and set as form value
formValues := make(map[string]string)
formValues["data"] = b64.StdEncoding.EncodeToString([]byte(c.Queue.Data["payload"].(string)))

// Generate token for user
token, err := user_token.Create(c.Queue.Data["user_id"].(string))
if err != nil {
// Update job as failed
c.Queue.UpdateError(err.Error())
return err
}

// Generate new id for logs
log_id := uuid.New()

// Generate command
command, err := sandbox.GenerateCommand(
extension,
credentials,
&models.CommandParams{
TargetFunction: c.Queue.Data["target"].(string),
Locale: helpers.Env("APP_LANG", "tr"),
Extension: c.Queue.Data["extension_id"].(string),
Server: c.Queue.Data["server_id"].(string),
User: c.Queue.Data["user_id"].(string),
LogID: log_id.String(),
RequestData: formValues,
BaseURL: "https://127.0.0.1",
Token: token,
},
)
if err != nil {
// Update job as failed
c.Queue.UpdateError(err.Error())

return err
}

output := linux.Execute(command)

c.Queue.UpdateAsDone(strings.TrimSpace(strings.ReplaceAll(output, "\"", "")))
return nil
}

0 comments on commit 6311840

Please sign in to comment.