Code Coverage |
||||||||||
Classes and Traits |
Functions and Methods |
Lines |
||||||||
Total | |
0.00% |
0 / 1 |
|
20.00% |
2 / 10 |
CRAP | |
29.79% |
28 / 94 |
TrackerPullProcess | |
0.00% |
0 / 1 |
|
20.00% |
2 / 10 |
104.61 | |
29.79% |
28 / 94 |
__construct | |
100.00% |
1 / 1 |
1 | |
100.00% |
10 / 10 |
|||
handle | |
0.00% |
0 / 1 |
10.31 | |
26.67% |
8 / 30 |
|||
normalizeConnectionConfig | |
100.00% |
1 / 1 |
1 | |
100.00% |
8 / 8 |
|||
setMongoConfig | |
0.00% |
0 / 1 |
1.04 | |
66.67% |
2 / 3 |
|||
getData | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 9 |
|||
infoHeader | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 6 |
|||
insert | |
0.00% |
0 / 1 |
6 | |
0.00% |
0 / 13 |
|||
delete | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 7 |
|||
getMinMax | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 3 |
|||
getQuery | |
0.00% |
0 / 1 |
12 | |
0.00% |
0 / 5 |
1 | <?php |
2 | |
3 | namespace Qmp\Laravel\Collect\Console\Commands; |
4 | |
5 | use Exception; |
6 | use MongoDB\BSON\ObjectId; |
7 | use Illuminate\Support\{Collection, Str}; |
8 | use Illuminate\Support\Facades\{DB, Log, Redis}; |
9 | use Qmp\Laravel\CommandsLaravel\Middleware\Library\Monitoring\{AddSubProcess, UpdateSubProcess}; |
10 | use Qmp\Laravel\DBConnector\Config; |
11 | use Qmp\Laravel\DBConnector\Connectors\MongoConnector; |
12 | use Qmp\Laravel\MicroService\Client\Client; |
13 | use Qmp\Laravel\MicroService\Client\Tools\Request as ClientRequest; |
14 | use Qmp\Laravel\AsyncProcessFactory\Traits\ProcessFactoryStatusModel; |
15 | use Qmp\Laravel\ToolsLaravel\Traits\Timer; |
16 | |
17 | class TrackerPullProcess extends \Commando |
18 | { |
19 | use Timer, ProcessFactoryStatusModel; |
20 | |
21 | /** |
22 | * the local mongo connection |
23 | * |
24 | * @var string |
25 | */ |
26 | const CONNECTION = 'collectMongo'; |
27 | |
28 | /** |
29 | * The distant collection name, used as prefix in local |
30 | * |
31 | * @var string |
32 | */ |
33 | const COLLECTION = 'data_collector'; |
34 | |
35 | /** |
36 | * The name and signature of the console command. |
37 | * |
38 | * @var string |
39 | */ |
40 | protected $signature = 'collect:tracker-pull-process {id} {parentId}'; |
41 | |
42 | /** |
43 | * The console command description. |
44 | * |
45 | * @var string |
46 | */ |
47 | protected $description = 'Pull all data from trackers'; |
48 | |
49 | /** |
50 | * The mongo connector used on tracker |
51 | * |
52 | * @var MongoConnector |
53 | */ |
54 | protected $mongo; |
55 | |
56 | /** |
57 | * Undocumented variable |
58 | * |
59 | * @var [type] |
60 | */ |
61 | protected $redis; |
62 | |
63 | /** |
64 | * The distant database |
65 | * |
66 | * @var string |
67 | */ |
68 | protected $database = 'tracker'; |
69 | |
70 | /** |
71 | * Size if the temporary collection |
72 | * |
73 | * @var integer |
74 | */ |
75 | protected $chunkSize = 10000; |
76 | |
77 | /** |
78 | * Limit volume to pull on tracker |
79 | * |
80 | * @var integer |
81 | */ |
82 | protected $limit = 100000; |
83 | |
84 | /** |
85 | * The insert message |
86 | * |
87 | * @var string |
88 | */ |
89 | protected $messageInsert = "<info>Inserting %s entries from chunk #%s ... </info> "; |
90 | |
91 | /** |
92 | * The delete message |
93 | * |
94 | * @var string |
95 | */ |
96 | protected $messageDelete = "<info>Deleting chunk #%s on tracker ... </info>"; |
97 | |
98 | /** |
99 | * Undocumented variable |
100 | * |
101 | * @var [type] |
102 | */ |
103 | protected $middlewareDataUpdate; |
104 | |
105 | /** |
106 | * Class Constructor |
107 | */ |
108 | public function __construct() |
109 | { |
110 | parent::__construct(); |
111 | |
112 | $this->mongo = new MongoConnector(); |
113 | $this->mongo->collection(self::COLLECTION); |
114 | $this->redis = Redis::connection('tracker'); |
115 | $this->middlewareDataUpdate = $this->middlewareData(UpdateSubProcess::class)->get(); |
116 | $this->middlewareDataUpdate->log['collections'] = []; |
117 | |
118 | $middlewares = [ |
119 | AddSubProcess::class, |
120 | UpdateSubProcess::class |
121 | ]; |
122 | |
123 | $this->middleware($middlewares); |
124 | $this->middlewareData($middlewares)->createId(); |
125 | } |
126 | |
127 | /** |
128 | * Execute the console command. |
129 | * |
130 | * @return void |
131 | * @throws Exception |
132 | */ |
133 | public function handle(): void |
134 | { |
135 | $this->startTimer('total'); |
136 | |
137 | $connection = $this->normalizeConnectionConfig($this->argument('id')); |
138 | $trackerName = $connection->get('name'); |
139 | |
140 | try { |
141 | |
142 | $this->setMongoConfig($connection); |
143 | $this->infoHeader($trackerName); |
144 | $data = $this->getData($trackerName); |
145 | |
146 | if ($data->count()) { |
147 | $this->redis->set($trackerName, $this->getMinMax($data)->toJson()); |
148 | |
149 | $this->startTimer('insert'); |
150 | |
151 | $chunks = $data->chunk($this->chunkSize); |
152 | $chunks->each(function ($chunk, $key) { |
153 | try { |
154 | $this->insert($chunk, $key) |
155 | ->delete($chunk, $key); |
156 | } catch (\Exception $e) { |
157 | Log::debug(var_export($e->getMessage(), true)); |
158 | } |
159 | $this->error($this->getTimer('insert', true)); |
160 | $this->line(''); |
161 | }); |
162 | |
163 | $this->redis->del($trackerName); |
164 | } else { |
165 | $this->info('Up to date !'); |
166 | $this->line(''); |
167 | } |
168 | |
169 | // Pass data to middleware |
170 | $this->middlewareDataUpdate->log = array_merge($this->middlewareDataUpdate->log, [ |
171 | 'count' => $data->count() . ' ' . Str::plural('entry', $data->count()), |
172 | 'duration' => $this->getTimer('total'), |
173 | 'tracker' => $connection->get('name'), |
174 | ]); |
175 | |
176 | $this->error('Total Time : ' . $this->middlewareDataUpdate->log['duration']); |
177 | } catch (Exception $e) { |
178 | $this->line('Error: ' . $e->getMessage()); |
179 | $this->middlewareDataUpdate->error = $this->reportException($e); |
180 | } |
181 | } |
182 | |
183 | /** |
184 | * Undocumented function |
185 | * |
186 | * @param string $connection |
187 | * @return Collection |
188 | */ |
189 | protected function normalizeConnectionConfig($id) |
190 | { |
191 | $request = ClientRequest::createObject('service_tracker', "config/" . $id); |
192 | $clientResponse = Client::systemSend('get', $request); |
193 | |
194 | $tempCollection = collect($clientResponse->content); |
195 | |
196 | return collect([ |
197 | 'host' => $tempCollection->get('ip'), |
198 | 'name' => $tempCollection->get('name'), |
199 | 'database' => $this->database |
200 | ])->merge($tempCollection->get('mongo')); |
201 | } |
202 | |
203 | /** |
204 | * Undocumented function |
205 | * |
206 | * @param Collection $config |
207 | * @return void |
208 | */ |
209 | protected function setMongoConfig(Collection $config) |
210 | { |
211 | $config = new Config($config->except('name')->toArray()); |
212 | $this->mongo->connection($config); |
213 | } |
214 | |
215 | /** |
216 | * Fetch all datas from the current tracker |
217 | * |
218 | * @paral string $trackerName |
219 | * @return Collection |
220 | */ |
221 | protected function getData($trackerName): Collection |
222 | { |
223 | $this->output->write('<info>Get entries from tracker ...</info>', false); |
224 | $this->startTimer('get'); |
225 | |
226 | $datas = $this->mongo->all($this->getQuery($trackerName), [ |
227 | 'sort' => ['_id' => 1], |
228 | 'limit' => $this->limit |
229 | ]); |
230 | |
231 | $this->output->write(' ' . $datas->count() . ' entries found', true); |
232 | $this->error($this->getTimer('get', true)); |
233 | $this->line(''); |
234 | return $datas; |
235 | } |
236 | |
237 | /** |
238 | * The info displayed when the process start |
239 | * |
240 | * @param string $name |
241 | * @return void |
242 | */ |
243 | protected function infoHeader(string $name): void |
244 | { |
245 | $message = "Connecting to $name"; |
246 | |
247 | $this->line(str_repeat('-', strlen($message))); |
248 | $this->line($message); |
249 | $this->line(str_repeat('-', strlen($message))); |
250 | $this->line(''); |
251 | } |
252 | |
253 | /** |
254 | * The local insert |
255 | * |
256 | * @param Collection $chunk |
257 | * @param int $key |
258 | * @return self |
259 | * @throws Exception |
260 | */ |
261 | protected function insert(Collection $chunk, int $key): self |
262 | { |
263 | $this->createProcessStatus('collect:data-distributor'); |
264 | |
265 | $this->output->write(sprintf($this->messageInsert, $chunk->count(), $key), false); |
266 | |
267 | $collection = self::COLLECTION . '_' . $this->id; |
268 | |
269 | $inserted = DB::connection(self::CONNECTION) |
270 | ->collection($collection) |
271 | ->insert($chunk->values()->toArray()); |
272 | |
273 | if (!$inserted) { |
274 | $this->middlewareDataUpdate->error = ['message' => 'Unable to insert local data : ' . $this->id]; |
275 | throw new Exception('Unable to insert local data : ' . $this->id); |
276 | } |
277 | |
278 | $this->middlewareDataUpdate->log['collections'][] = ['collection' => $collection, 'entries' => $chunk->count()]; |
279 | |
280 | $this->output->write('done', false); |
281 | |
282 | $this->activeProcessStatus(); |
283 | |
284 | return $this; |
285 | } |
286 | |
287 | /** |
288 | * The remote delete |
289 | * |
290 | * @param Collection $chunk |
291 | * @param int $key |
292 | * @return void |
293 | */ |
294 | protected function delete(Collection $chunk, int $key): void |
295 | { |
296 | $this->info(''); |
297 | $repeat = strlen(sprintf($this->messageInsert, $chunk->count(), $key)) - strlen(sprintf($this->messageDelete, $key)); |
298 | $this->output->write(sprintf($this->messageDelete, $key), false); |
299 | |
300 | //$query = [ '_id'=> [ '$in' => $chunk->pluck('_id')->toArray()]]; |
301 | $query = $this->getQuery(null, $this->getMinMax($chunk)->toArray()); |
302 | $del = $this->mongo->deleteMany($query); |
303 | |
304 | $this->output->write(str_repeat(" ", $repeat) . 'done (' . $del->getDeletedCount() . ')', true); |
305 | } |
306 | |
307 | /** |
308 | * Undocumented function |
309 | * |
310 | * @param [type] $data |
311 | * @return Collection |
312 | */ |
313 | protected function getMinMax(Collection $data) |
314 | { |
315 | return collect([ |
316 | 'min' => $data->first()->mongoId(true), |
317 | 'max' => $data->last()->mongoId(true), |
318 | ]); |
319 | } |
320 | |
321 | /** |
322 | * Undocumented function |
323 | * |
324 | * @param mixed $collection |
325 | * @return Array |
326 | */ |
327 | protected function getQuery($trackerName, array $array = null) |
328 | { |
329 | $last = $array ? $array : json_decode($this->redis->get($trackerName), true); |
330 | |
331 | if ($last) { |
332 | return [ |
333 | '_id' => [ |
334 | '$gte' => new ObjectId($last['min']), |
335 | '$lte' => new ObjectId($last['max']), |
336 | ] |
337 | ]; |
338 | } |
339 | |
340 | return []; |
341 | } |
342 | } |