Code Coverage
 
Classes and Traits
Functions and Methods
Lines
Total
0.00% covered (danger)
0.00%
0 / 1
25.00% covered (danger)
25.00%
1 / 4
CRAP
20.00% covered (danger)
20.00%
9 / 45
DataDistributorProcess
0.00% covered (danger)
0.00%
0 / 1
25.00% covered (danger)
25.00%
1 / 4
32.09
20.00% covered (danger)
20.00%
9 / 45
 __construct
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
9 / 9
 handle
0.00% covered (danger)
0.00%
0 / 1
20
0.00% covered (danger)
0.00%
0 / 33
 deleteTempCollection
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 2
 getCollection
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 1
1<?php
2
3namespace Qmp\Laravel\Collect\Console\Commands;
4
5use Exception;
6use Illuminate\Support\Facades\{DB, Log, Schema};
7use Jenssegers\Mongodb\Query\Builder;
8use Qmp\Laravel\AsyncProcessFactory\Traits\{ProcessFactoryStarted, ProcessFactoryStatus, ProcessFactoryStatusModel};
9use Qmp\Laravel\CommandsLaravel\Middleware\Library\Monitoring\{AddSubProcess, UpdateSubProcessAsync};
10use Qmp\Laravel\ToolsLaravel\Traits\Timer;
11
12class DataDistributorProcess extends \Commando
13{
14
15    use Timer, ProcessFactoryStarted, ProcessFactoryStatus, ProcessFactoryStatusModel;
16
17    /**
18     * The name and signature of the console command.
19     *
20     * @var string
21     */
22    protected $signature = 'collect:data-distributor-process {id} {parentId}';
23
24    /**
25     * The console command description.
26     *
27     * @var string
28     */
29    protected $description = 'Process the given temporary collection';
30
31    /**
32     * The current db connection
33     *
34     * @var string
35     */
36    protected $connection;
37
38    /**
39     * The current temporary collection
40     *
41     * @var string
42     */
43    protected $collection;
44
45    /**
46     * Undocumented variable
47     *
48     * @var [type]
49     */
50    protected $middlewareDataUpdate;
51
52    /**
53     * Create a new command instance.
54     *
55     * @return void
56     */
57    public function __construct()
58    {
59        parent::__construct();
60
61        $this->collection = TrackerPullProcess::COLLECTION . '_';
62        $this->connection = TrackerPullProcess::CONNECTION;
63
64        $this->middlewareDataUpdate = $this->middlewareData(UpdateSubProcessAsync::class)->get();
65        $this->middlewareDataUpdate->log['inserted'] = [];
66
67        $middlewares = [
68            AddSubProcess::class,
69            UpdateSubProcessAsync::class
70        ];
71
72        $this->middleware($middlewares);
73        $this->middlewareData($middlewares)->createId();
74    }
75
76    /**
77     * Execute the console command.
78     *
79     * @return void
80     * @throws Exception
81     */
82    public function handle(): void
83    {
84        $this->startTimer('total');
85
86        try {
87            $this->id = $this->argument('id');
88            $this->collection .= $this->id;
89            $totalInserted = 0;
90            $totalDuplicates = 0;
91
92            // Used by the factory to ensure that the process is started
93            $this->hasStarted();
94
95            $this->updateProcessStatus(getmypid(),  array_merge($this->options(), $this->arguments()));
96
97            $this->getCollection($this->connection, $this->collection)->get()->groupBy('type')->each(function ($group, $type) use (&$totalDuplicates, &$totalInserted) {
98                $config = collect(config('collect.' . $type));
99                Log::debug($config);
100                try {
101                    $result = DB::connection(TrackerPullProcess::CONNECTION)->getMongoDb()->selectCollection(TrackerPullProcess::COLLECTION)->insertMany($group->toArray(), ['ordered' => false]);
102
103                    $totalInserted += $result->getInsertedCount();
104
105                    $this->middlewareDataUpdate->log['inserted'][] = [
106                        'type' => $type,
107                        'count' => $result->getInsertedCount(),
108                    ];
109                } catch (\MongoDB\Driver\Exception\BulkWriteException $e) {
110
111                    if (!isset($this->middlewareDataUpdate->warning)) {
112                        $this->middlewareDataUpdate->warning = [];
113                    }
114
115                    $this->middlewareDataUpdate->warning[] = [
116                        'type' => 'duplicates',
117                        'dataType' => $type,
118                        'count' => count($e->getWriteResult()->getWriteErrors())
119                    ];
120
121                    $totalDuplicates += count($e->getWriteResult()->getWriteErrors());
122
123                    Log::debug("Skip duplicates (type: $type) :" . count($e->getWriteResult()->getWriteErrors()));
124                }
125            });
126
127            $this->deleteProcessStatus();
128            $this->deleteTempCollection();
129
130            $this->middlewareDataUpdate->log['totalInserted'] = $totalInserted;
131            $this->middlewareDataUpdate->log['totalDuplicates'] = $totalDuplicates;
132            $this->middlewareDataUpdate->log['duration'] = $this->getTimer('total');
133            
134        } catch (Exception $e) {
135            $this->middlewareDataUpdate->error = $this->reportException($e);
136        }
137    }
138
139    /**
140     * Delete the process when job is done
141     *
142     * @return void
143     */
144    protected function deleteTempCollection()
145    {
146        Schema::connection('collectMongo')->drop($this->collection);
147    }
148
149    /**
150     * Return the collection
151     *
152     * @param string $connection
153     * @param string $collection
154     * @return \Jenssegers\Mongodb\Query\Builder
155     */
156    protected function getCollection(string $connection, string $collection): Builder
157    {
158        return DB::connection($connection)->collection($collection);
159    }
160}